From e4c4b7bb5c7b5ce972c9c0007a746cdcee6b6a65 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Sun, 29 Sep 2024 18:35:18 +0800 Subject: [PATCH 1/3] [INLONG-1020][Doc] Add doc of extension connector --- .../extension_sort/extension_connector.md | 107 ++++++++++++++++ .../extension_sort/extension_connector.md | 119 ++++++++++++++++++ 2 files changed, 226 insertions(+) create mode 100644 docs/development/extension_sort/extension_connector.md create mode 100644 i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/extension_connector.md diff --git a/docs/development/extension_sort/extension_connector.md b/docs/development/extension_sort/extension_connector.md new file mode 100644 index 00000000000..bf461fb4ccc --- /dev/null +++ b/docs/development/extension_sort/extension_connector.md @@ -0,0 +1,107 @@ +--- +title: Sort Extension Connector +sidebar_position: 1 +--- + +# How to Extend Extract Node and Load Node Types + +## Extending Extract & Load Node + +### Introduction + +The Extract nodes is a set of Source Connectors based on Apache Flink® used to extract data from different source systems. +The Load nodes is a set of Sink Connectors based on Apache Flink® used to load data into different storage systems. + +When Apache InLong Sort starts, it translates a set of Extract and Load Node configurations into corresponding Flink SQL and submits them to the Flink cluster, initiating the data extraction and loading tasks specified by the user. + +### Adding Extract & Load Node Definitions + +To customize an `Extract Node`, you need to inherit the `org.apache.inlong.sort.protocol.node.ExtractNode` class, and to customize a `Load Node`, you need to inherit the `org.apache.inlong.sort.protocol.node.LoadNode` class. Both must selectively implement methods from the `org.apache.inlong.sort.protocol.node.Node` interface. + +| Method Name | Meaning | Default Value | +|--------------------|------------------|--------------------------| +| getId | Get node ID | Inlong StreamSource Id | +| getName | Get node name | Inlong StreamSource Name | +| getFields | Get field information | Fields defined by Inlong Stream | +| getProperties | Get additional node properties | Empty Map | +| tableOptions | Get Flink SQL table properties | Additional node properties | +| genTableName | Generate Flink SQL table name | No default value | +| getPrimaryKey | Get primary key | null | +| getPartitionFields | Get partition fields | null | + +Additionally, Sort has added two extra interfaces, `InlongMetric` and `Metadata`, to support richer semantics. + +#### InlongMetric +If a custom node needs to report Inlong metrics, it must implement the `org.apache.inlong.sort.protocol.InlongMetric` interface.
+When Sort parses the configuration, it adds the startup parameter `'inlong.metric.labels' = 'groupId={g}&streamId={s}&nodeId={n}'` to the table option, which is used to configure Inlong Audit.
+For details, see [How to Integrate Inlong Audit into Custom Connector](#jump1) + +#### Metadata +If a custom node needs to specify a field as a Flink SQL Metadata field, it must implement the `org.apache.inlong.sort.protocol.Metadata` interface. +Sort will automatically mark the corresponding field as Metadata when parsing the configuration. + +## Extending Apache Flink Connector + +Sort is implemented based on Apache Flink version 1.15. For information on how to extend the Apache Flink Connector, refer to User-defined Sources & Sinks + +## How to Integrate Inlong Audit into Custom Connector +Inlong Sort encapsulates the metric reporting process in the `org.apache.inlong.sort.base.metric.SourceExactlyMetric` and `org.apache.inlong.sort.base.metric.SinkExactlyMetric` classes. Developers only need to initialize the corresponding `Metric` object according to the Source/Sink type to implement metric reporting. + +The common practice is to pass parameters such as the InLong Audit address when constructing the Source/Sink, and initialize the `SourceExactlyMetric/SinkExactlyMetric` object when calling the open() method to initialize the Source/Sink operator. After processing the actual data, call the corresponding audit reporting method. + +```java +public class StarRocksDynamicSinkFunctionV2 extends StarRocksDynamicSinkFunctionBase { + + private static final long serialVersionUID = 1L; + private static final Logger log = LoggerFactory.getLogger(StarRocksDynamicSinkFunctionV2.class); + + private transient SinkExactlyMetric sinkExactlyMetric; + + private String inlongMetric; + private String auditHostAndPorts; + private String auditKeys; + private String stateKey; + + public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions, + TableSchema schema, + StarRocksIRowTransformer rowTransformer, String inlongMetric, + String auditHostAndPorts, String auditKeys) { + this.sinkOptions = sinkOptions; + + // pass the params of inlong audit + this.auditHostAndPorts = auditHostAndPorts; + this.inlongMetric = inlongMetric; + this.auditKeys = auditKeys; + } + + @Override + public void open(Configuration parameters) { + + // init SinkExactlyMetric in open() + MetricOption metricOption = MetricOption.builder().withInlongLabels(inlongMetric) + .withAuditAddress(auditHostAndPorts) + .withAuditKeys(auditKeys) + .build(); + + if (metricOption != null) { + sinkExactlyMetric = new SinkExactlyMetric(metricOption, getRuntimeContext().getMetricGroup()); + } + } + + @Override + public void invoke(T value, Context context) + throws IOException, ClassNotFoundException, JSQLParserException { + Object[] data = rowTransformer.transform(value, sinkOptions.supportUpsertDelete()); + + sinkManager.write( + null, + sinkOptions.getDatabaseName(), + sinkOptions.getTableName(), + serializer.serialize(schemaUtils.filterOutTimeField(data))); + + // output audit after write data to sink + if (sinkExactlyPropagateMetric != null) { + sinkExactlyPropagateMetric.invoke(1, getDataSize(value), schemaUtils.getDataTime(data)); + } + } +``` diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/extension_connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/extension_connector.md new file mode 100644 index 00000000000..dfa1cf71e08 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/extension_connector.md @@ -0,0 +1,119 @@ +--- +title: Sort 扩展 Connector +sidebar_position: 1 +--- + +# 如何扩展 Extract Node 和 Load Node 类型 + +## 扩展 Extract & Load Node + +### 简介 + +Extract 节点是基于 Apache Flink® 的 +Source Connectors 用于从不同的源系统抽取数据。 +Load 节点是基于 Apache Flink® 的 Sink +Connectors 用于将数据加载到不同的存储系统。 + +Apache InLong Sort 启动时通过将一组 Extract 和 Load Node 配置翻译成对应的 Flink SQL 并提交到 Flink +集群,拉起用户指定的数据抽取和入库任务。 + +### 增加 Extract & Load Node 定义 + +自定义 `Extract Node` 需要继承 `org.apache.inlong.sort.protocol.node.ExtractNode` +类,自定义 `Load Node` 需要继承 `org.apache.inlong.sort.protocol.node.LoadNode` 类, +两者都至少需要选择性实现 `org.apache.inlong.sort.protocol.node.Node` 接口中的方法 + +| 方法名 | 含义 | 默认值 | +|--------------------|------------------|--------------------------| +| getId | 获取节点ID | Inlong StreamSource Id | +| getName | 获取节点名 | Inlong StreamSource Name | +| getFields | 获取字段信息 | Inlong Stream 定义的字段 | +| getProperties | 获取节点额外属性 | 空 Map | +| tableOptions | 获取 Flink SQL 表属性 | 节点额外属性 | +| genTableName | 生成 Flink SQL 表名 | 无默认值 | +| getPrimaryKey | 获取主键 | null | +| getPartitionFields | 获取分区字段 | null | + +同时,Sort 还增加了 `InlongMetric` 和 `Metadata` 两个额外的接口用来支持更丰富的语义。 + +#### InlongMetric +如果自定义节点需要上报 Inlong 指标,则需要实现 `org.apache.inlong.sort.protocol.InlongMetric` 接口。
+Inlong Sort 解析配置时会向 table option 中增加 +`'inlong.metric.labels' = 'groupId={g}&streamId={s}&nodeId={n}'` 启动参数,并以此来配置 Inlong Audit。
+详情请查看 +[如何集成 Inlong Audit 到自定义 Connector](#jump1) + +#### Metadata +如果自定义节点需要指定某个字段为 Flink SQL 的 Metadata 字段,则需要实现 `org.apache.inlong.sort.protocol.Metadata` 接口。 +Inlong Sort 解析配置时会自动将对应的字段标记为 Metadata。 + +## 扩展 Apache Flink Connector + +Sort 基于 Apache Flink 1.15 版本实现。如何扩展 Apache Flink Connector +可以参考 +User-defined Sources & Sinks + +## 如何集成 Inlong Audit 到自定义 Connector +Inlong Sort 将指标上报的流程封装在了 `org.apache.inlong.sort.base.metric.SourceExactlyMetric` +和 `org.apache.inlong.sort.base.metric.SinkExactlyMetric` 类中。开发者只需要根据 Source/Sink 类型初始化对应的 `Metric` 对象,则可以实现指标上报。 + +通常的做法是在构造 Source/Sink 时传递例如 InLong Audit 地址,在初始化 Source/Sink 算子调用 open() 方法时进行初始化 `SourceExactlyMetric/SinkExactlyMetric` 对象。 +在处理实际数据后再调用对应的审计上报方法。 + +``` +public class StarRocksDynamicSinkFunctionV2 extends StarRocksDynamicSinkFunctionBase { + + private static final long serialVersionUID = 1L; + private static final Logger log = LoggerFactory.getLogger(StarRocksDynamicSinkFunctionV2.class); + + private transient SinkExactlyMetric sinkExactlyMetric; + + private String inlongMetric; + private String auditHostAndPorts; + private String auditKeys; + private String stateKey; + + public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions, + TableSchema schema, + StarRocksIRowTransformer rowTransformer, String inlongMetric, + String auditHostAndPorts, String auditKeys) { + this.sinkOptions = sinkOptions; + + // pass the params of inlong audit + this.auditHostAndPorts = auditHostAndPorts; + this.inlongMetric = inlongMetric; + this.auditKeys = auditKeys; + } + + @Override + public void open(Configuration parameters) { + + // init SinkExactlyMetric in open() + MetricOption metricOption = MetricOption.builder().withInlongLabels(inlongMetric) + .withAuditAddress(auditHostAndPorts) + .withAuditKeys(auditKeys) + .build(); + + if (metricOption != null) { + sinkExactlyMetric = new SinkExactlyMetric(metricOption, getRuntimeContext().getMetricGroup()); + } + } + + @Override + public void invoke(T value, Context context) + throws IOException, ClassNotFoundException, JSQLParserException { + Object[] data = rowTransformer.transform(value, sinkOptions.supportUpsertDelete()); + + sinkManager.write( + null, + sinkOptions.getDatabaseName(), + sinkOptions.getTableName(), + serializer.serialize(schemaUtils.filterOutTimeField(data))); + + // output audit after write data to sink + if (sinkExactlyMetric != null) { + sinkExactlyMetric.invoke(1, getDataSize(value), schemaUtils.getDataTime(data)); + } + } +``` + From d5c9c38ba4341a8392809e6b95394b5cb6ee1197 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Sun, 29 Sep 2024 23:17:03 +0800 Subject: [PATCH 2/3] fix --- docs/development/extension_sort/extension_connector.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/development/extension_sort/extension_connector.md b/docs/development/extension_sort/extension_connector.md index bf461fb4ccc..55ce4e0bdca 100644 --- a/docs/development/extension_sort/extension_connector.md +++ b/docs/development/extension_sort/extension_connector.md @@ -49,7 +49,7 @@ Inlong Sort encapsulates the metric reporting process in the `org.apache.inlong. The common practice is to pass parameters such as the InLong Audit address when constructing the Source/Sink, and initialize the `SourceExactlyMetric/SinkExactlyMetric` object when calling the open() method to initialize the Source/Sink operator. After processing the actual data, call the corresponding audit reporting method. -```java +``` public class StarRocksDynamicSinkFunctionV2 extends StarRocksDynamicSinkFunctionBase { private static final long serialVersionUID = 1L; From 12d0529dac6209f501f18c0d3cf30e90544757f8 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Mon, 30 Sep 2024 00:02:38 +0800 Subject: [PATCH 3/3] fix --- docs/development/extension_sort/extension_connector.md | 4 ++-- .../current/development/extension_sort/extension_connector.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/development/extension_sort/extension_connector.md b/docs/development/extension_sort/extension_connector.md index 55ce4e0bdca..aea65328cfb 100644 --- a/docs/development/extension_sort/extension_connector.md +++ b/docs/development/extension_sort/extension_connector.md @@ -32,8 +32,8 @@ To customize an `Extract Node`, you need to inherit the `org.apache.inlong.sort. Additionally, Sort has added two extra interfaces, `InlongMetric` and `Metadata`, to support richer semantics. #### InlongMetric -If a custom node needs to report Inlong metrics, it must implement the `org.apache.inlong.sort.protocol.InlongMetric` interface.
-When Sort parses the configuration, it adds the startup parameter `'inlong.metric.labels' = 'groupId={g}&streamId={s}&nodeId={n}'` to the table option, which is used to configure Inlong Audit.
+If a custom node needs to report Inlong metrics, it must implement the `org.apache.inlong.sort.protocol.InlongMetric` interface. +When Sort parses the configuration, it adds the startup parameter `'inlong.metric.labels' = 'groupId={g}&streamId={s}&nodeId={n}'` to the table option, which is used to configure Inlong Audit. For details, see [How to Integrate Inlong Audit into Custom Connector](#jump1) #### Metadata diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/extension_connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/extension_connector.md index dfa1cf71e08..4a2faebacfd 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/extension_connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/extension_connector.md @@ -37,9 +37,9 @@ Apache InLong Sort 启动时通过将一组 Extract 和 Load Node 配置翻译 同时,Sort 还增加了 `InlongMetric` 和 `Metadata` 两个额外的接口用来支持更丰富的语义。 #### InlongMetric -如果自定义节点需要上报 Inlong 指标,则需要实现 `org.apache.inlong.sort.protocol.InlongMetric` 接口。
+如果自定义节点需要上报 Inlong 指标,则需要实现 `org.apache.inlong.sort.protocol.InlongMetric` 接口。 Inlong Sort 解析配置时会向 table option 中增加 -`'inlong.metric.labels' = 'groupId={g}&streamId={s}&nodeId={n}'` 启动参数,并以此来配置 Inlong Audit。
+`'inlong.metric.labels' = 'groupId={g}&streamId={s}&nodeId={n}'` 启动参数,并以此来配置 Inlong Audit。 详情请查看 [如何集成 Inlong Audit 到自定义 Connector](#jump1)