Skip to content

Commit

Permalink
[INLONG-1020][Doc] Add doc of extension connector (#1036)
Browse files Browse the repository at this point in the history
* [INLONG-1020][Doc] Add doc of extension connector
---------

Co-authored-by: vernedeng <[email protected]>
  • Loading branch information
vernedeng and vernedeng authored Sep 29, 2024
1 parent cf02dc5 commit a5e739d
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 0 deletions.
107 changes: 107 additions & 0 deletions docs/development/extension_sort/extension_connector.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
---
title: Sort Extension Connector
sidebar_position: 1
---

# How to Extend Extract Node and Load Node Types

## Extending Extract & Load Node

### Introduction

The Extract nodes is a set of Source Connectors based on <a href="https://flink.apache.org/">Apache Flink<sup>®</sup></a> used to extract data from different source systems.
The Load nodes is a set of Sink Connectors based on <a href="https://flink.apache.org/">Apache Flink<sup>®</sup></a> used to load data into different storage systems.

When Apache InLong Sort starts, it translates a set of Extract and Load Node configurations into corresponding Flink SQL and submits them to the Flink cluster, initiating the data extraction and loading tasks specified by the user.

### Adding Extract & Load Node Definitions

To customize an `Extract Node`, you need to inherit the `org.apache.inlong.sort.protocol.node.ExtractNode` class, and to customize a `Load Node`, you need to inherit the `org.apache.inlong.sort.protocol.node.LoadNode` class. Both must selectively implement methods from the `org.apache.inlong.sort.protocol.node.Node` interface.

| Method Name | Meaning | Default Value |
|--------------------|------------------|--------------------------|
| getId | Get node ID | Inlong StreamSource Id |
| getName | Get node name | Inlong StreamSource Name |
| getFields | Get field information | Fields defined by Inlong Stream |
| getProperties | Get additional node properties | Empty Map |
| tableOptions | Get Flink SQL table properties | Additional node properties |
| genTableName | Generate Flink SQL table name | No default value |
| getPrimaryKey | Get primary key | null |
| getPartitionFields | Get partition fields | null |

Additionally, Sort has added two extra interfaces, `InlongMetric` and `Metadata`, to support richer semantics.

#### InlongMetric
If a custom node needs to report Inlong metrics, it must implement the `org.apache.inlong.sort.protocol.InlongMetric` interface.
When Sort parses the configuration, it adds the startup parameter `'inlong.metric.labels' = 'groupId={g}&streamId={s}&nodeId={n}'` to the table option, which is used to configure Inlong Audit.
For details, see [How to Integrate Inlong Audit into Custom Connector](#jump1)

#### Metadata
If a custom node needs to specify a field as a Flink SQL Metadata field, it must implement the `org.apache.inlong.sort.protocol.Metadata` interface.
Sort will automatically mark the corresponding field as Metadata when parsing the configuration.

## Extending Apache Flink Connector

Sort is implemented based on Apache Flink version 1.15. For information on how to extend the Apache Flink Connector, refer to <a href="https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sourcessinks/">User-defined Sources & Sinks</a>

## <span id="jump1">How to Integrate Inlong Audit into Custom Connector</span>
Inlong Sort encapsulates the metric reporting process in the `org.apache.inlong.sort.base.metric.SourceExactlyMetric` and `org.apache.inlong.sort.base.metric.SinkExactlyMetric` classes. Developers only need to initialize the corresponding `Metric` object according to the Source/Sink type to implement metric reporting.

The common practice is to pass parameters such as the InLong Audit address when constructing the Source/Sink, and initialize the `SourceExactlyMetric/SinkExactlyMetric` object when calling the open() method to initialize the Source/Sink operator. After processing the actual data, call the corresponding audit reporting method.

```
public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunctionBase<T> {
private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger(StarRocksDynamicSinkFunctionV2.class);
private transient SinkExactlyMetric sinkExactlyMetric;
private String inlongMetric;
private String auditHostAndPorts;
private String auditKeys;
private String stateKey;
public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions,
TableSchema schema,
StarRocksIRowTransformer<T> rowTransformer, String inlongMetric,
String auditHostAndPorts, String auditKeys) {
this.sinkOptions = sinkOptions;
// pass the params of inlong audit
this.auditHostAndPorts = auditHostAndPorts;
this.inlongMetric = inlongMetric;
this.auditKeys = auditKeys;
}
@Override
public void open(Configuration parameters) {
// init SinkExactlyMetric in open()
MetricOption metricOption = MetricOption.builder().withInlongLabels(inlongMetric)
.withAuditAddress(auditHostAndPorts)
.withAuditKeys(auditKeys)
.build();
if (metricOption != null) {
sinkExactlyMetric = new SinkExactlyMetric(metricOption, getRuntimeContext().getMetricGroup());
}
}
@Override
public void invoke(T value, Context context)
throws IOException, ClassNotFoundException, JSQLParserException {
Object[] data = rowTransformer.transform(value, sinkOptions.supportUpsertDelete());
sinkManager.write(
null,
sinkOptions.getDatabaseName(),
sinkOptions.getTableName(),
serializer.serialize(schemaUtils.filterOutTimeField(data)));
// output audit after write data to sink
if (sinkExactlyPropagateMetric != null) {
sinkExactlyPropagateMetric.invoke(1, getDataSize(value), schemaUtils.getDataTime(data));
}
}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
---
title: Sort 扩展 Connector
sidebar_position: 1
---

# 如何扩展 Extract Node 和 Load Node 类型

## 扩展 Extract & Load Node

### 简介

Extract 节点是基于 <a href="https://flink.apache.org/">Apache Flink<sup>®</sup></a> 的
Source Connectors 用于从不同的源系统抽取数据。
Load 节点是基于 <a href="https://flink.apache.org/">Apache Flink<sup>®</sup></a> 的 Sink
Connectors 用于将数据加载到不同的存储系统。

Apache InLong Sort 启动时通过将一组 Extract 和 Load Node 配置翻译成对应的 Flink SQL 并提交到 Flink
集群,拉起用户指定的数据抽取和入库任务。

### 增加 Extract & Load Node 定义

自定义 `Extract Node` 需要继承 `org.apache.inlong.sort.protocol.node.ExtractNode`
类,自定义 `Load Node` 需要继承 `org.apache.inlong.sort.protocol.node.LoadNode` 类,
两者都至少需要选择性实现 `org.apache.inlong.sort.protocol.node.Node` 接口中的方法

| 方法名 | 含义 | 默认值 |
|--------------------|------------------|--------------------------|
| getId | 获取节点ID | Inlong StreamSource Id |
| getName | 获取节点名 | Inlong StreamSource Name |
| getFields | 获取字段信息 | Inlong Stream 定义的字段 |
| getProperties | 获取节点额外属性 | 空 Map |
| tableOptions | 获取 Flink SQL 表属性 | 节点额外属性 |
| genTableName | 生成 Flink SQL 表名 | 无默认值 |
| getPrimaryKey | 获取主键 | null |
| getPartitionFields | 获取分区字段 | null |

同时,Sort 还增加了 `InlongMetric``Metadata` 两个额外的接口用来支持更丰富的语义。

#### InlongMetric
如果自定义节点需要上报 Inlong 指标,则需要实现 `org.apache.inlong.sort.protocol.InlongMetric` 接口。
Inlong Sort 解析配置时会向 table option 中增加
`'inlong.metric.labels' = 'groupId={g}&streamId={s}&nodeId={n}'` 启动参数,并以此来配置 Inlong Audit。
详情请查看
[如何集成 Inlong Audit 到自定义 Connector](#jump1)

#### Metadata
如果自定义节点需要指定某个字段为 Flink SQL 的 Metadata 字段,则需要实现 `org.apache.inlong.sort.protocol.Metadata` 接口。
Inlong Sort 解析配置时会自动将对应的字段标记为 Metadata。

## 扩展 Apache Flink Connector

Sort 基于 Apache Flink 1.15 版本实现。如何扩展 Apache Flink Connector
可以参考 <a href="https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sourcessinks/">
User-defined Sources & Sinks</a>

## <span id="jump1">如何集成 Inlong Audit 到自定义 Connector</span>
Inlong Sort 将指标上报的流程封装在了 `org.apache.inlong.sort.base.metric.SourceExactlyMetric`
`org.apache.inlong.sort.base.metric.SinkExactlyMetric` 类中。开发者只需要根据 Source/Sink 类型初始化对应的 `Metric` 对象,则可以实现指标上报。

通常的做法是在构造 Source/Sink 时传递例如 InLong Audit 地址,在初始化 Source/Sink 算子调用 open() 方法时进行初始化 `SourceExactlyMetric/SinkExactlyMetric` 对象。
在处理实际数据后再调用对应的审计上报方法。

```
public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunctionBase<T> {
private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger(StarRocksDynamicSinkFunctionV2.class);
private transient SinkExactlyMetric sinkExactlyMetric;
private String inlongMetric;
private String auditHostAndPorts;
private String auditKeys;
private String stateKey;
public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions,
TableSchema schema,
StarRocksIRowTransformer<T> rowTransformer, String inlongMetric,
String auditHostAndPorts, String auditKeys) {
this.sinkOptions = sinkOptions;
// pass the params of inlong audit
this.auditHostAndPorts = auditHostAndPorts;
this.inlongMetric = inlongMetric;
this.auditKeys = auditKeys;
}
@Override
public void open(Configuration parameters) {
// init SinkExactlyMetric in open()
MetricOption metricOption = MetricOption.builder().withInlongLabels(inlongMetric)
.withAuditAddress(auditHostAndPorts)
.withAuditKeys(auditKeys)
.build();
if (metricOption != null) {
sinkExactlyMetric = new SinkExactlyMetric(metricOption, getRuntimeContext().getMetricGroup());
}
}
@Override
public void invoke(T value, Context context)
throws IOException, ClassNotFoundException, JSQLParserException {
Object[] data = rowTransformer.transform(value, sinkOptions.supportUpsertDelete());
sinkManager.write(
null,
sinkOptions.getDatabaseName(),
sinkOptions.getTableName(),
serializer.serialize(schemaUtils.filterOutTimeField(data)));
// output audit after write data to sink
if (sinkExactlyMetric != null) {
sinkExactlyMetric.invoke(1, getDataSize(value), schemaUtils.getDataTime(data));
}
}
```

0 comments on commit a5e739d

Please sign in to comment.