Skip to content

Commit

Permalink
[INLONG-1055][Doc] Add documentation for developing user-defined flin…
Browse files Browse the repository at this point in the history
…k metrics (#1056)
  • Loading branch information
PeterZh6 authored Oct 16, 2024
1 parent fbc674b commit b60ae76
Show file tree
Hide file tree
Showing 2 changed files with 285 additions and 0 deletions.
150 changes: 150 additions & 0 deletions docs/development/extension_sort/custom_flink_metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
---
title: Customize Flink Metrics in Sort
sidebar_position: 4
---

# Customize Flink Metrics in Sort-Connector

## Overview

The InLong Sort framework allows users to define and insert custom Flink metrics within different connectors to monitor the data processing pipeline closely. These custom metrics are generally used to track key performance indicators such as serialization/deserialization success/failure counts, latency, snapshot states, transaction completion statuses, etc. These metrics are recorded and reported through `SourceExactlyMetric` and `SinkExactlyMetric` objects at the appropriate logic nodes.

## Steps to Insert Custom Flink Metrics

To create and insert a custom Flink metric within a new connector, you typically need to follow these steps. Using the example of tracking deserialization error count (`numDeserializeError`) in the `inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc`, the following steps outline how to insert a custom metric within the InLong Sort framework.

### 1. Create the Metric Object

First, add a new Flink metric object in the `SourceExactlyMetric` or `SinkExactlyMetric` class. Metric objects can typically be of types like `Counter`, `Gauge`, or `Meter`. In this example, a `Counter` is created to track deserialization errors and is added as a class member:

```java
private Counter numDeserializeError;
```

### 2. Implement the `registerMetricsForXXX` Method

To initialize and register this metric object, write a `registerMetricsForNumDeserializeError` method. Within this method, the `Counter` object is registered with Flink's metric system using `registerCounter`, allowing Flink to track the metric.

```java
public void registerMetricsForNumDeserializeError(Counter counter) {
numDeserializeError = registerCounter("numDeserializeError", counter);
}
```

In this method, the custom `Counter` object is linked to Flink's metric system using `registerCounter`, ensuring that the metric is properly recorded during data processing.

### 3. Call the Registration Method in the Constructor

Within the class constructor, call the registration method with the `MetricOption` and `MetricGroup` parameters. This ensures the metric object is properly initialized and registered upon instantiation:

```java
public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) {
this.metricGroup = metricGroup;
this.labels = option.getLabels();
registerMetricsForNumDeserializeError(new ThreadSafeCounter());
}
```

By calling the `registerMetricsForNumDeserializeError` method in the constructor, the `numDeserializeError` counter is initialized and ready to record deserialization errors upon each instantiation.

### 4. Write the Metric's Getter, Setter, and Operation Methods

To manipulate the `numDeserializeError` counter externally, implement the necessary getter and operation methods. In this case, an `incNumDeserializeError` method increments the counter whenever a deserialization error occurs:

```java
public void incNumDeserializeError() {
if (numDeserializeError != null) {
numDeserializeError.inc();
}
}
```

This method ensures that `incNumDeserializeError` is called to increment the error count whenever a deserialization error is detected.

### 5. Add the New Metric Output in the `toString` Method

To facilitate debugging, monitoring and ensure the completeness of code, include the custom metric output in the `toString` method:

```java
@Override
public String toString() {
return "SourceMetricData{"
+ ", numDeserializeError=" + numDeserializeError.getCount()
+ "}";
}
```

### 6. Insert the Custom Metric in appropriate places

After registering and initializing the metric, invoke it at the appropriate logic node. In this example, call `incNumDeserializeError` in the deserialization method to track each deserialization error. The following code shows how to implement this:

```java
@Override
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
try {
// Execute deserialization logic
} catch (Exception e) {
// Increment error count on deserialization failure
// Ensure sourceExactlyMetric is not null
if(sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeError();
}
throw e;
}
}
```

This method ensures that each deserialization error triggers `incNumDeserializeError`, accurately reflecting error frequency.

## Testing and Verification

Using `sort-end-to-end-tests` located in the `inlong-sort/sort-end-to-end-tests/` directory:

1. **Set Metric Labels in SQL**: Add `inlong.metric.labels` in the test SQL file to ensure Flink recognizes the metric labels. For example, in `sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql`:

```sql
CREATE TABLE test_input1 (
`id` INT primary key,
name STRING,
description STRING
) WITH (
'connector' = 'postgres-cdc-inlong',
'hostname' = 'postgres',
'port' = '5432',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'test',
'table-name' = 'test_input1',
'schema-name' = 'public',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'inlong_slot',
'debezium.slot.name' = 'inlong_slot',
-- Added portion
'inlong.metric.labels' = 'groupId=pggroup&streamId=pgStream&nodeId=pgNode'
);

-- Keep Flink SQL for sink unchanged
```

2. **Configure Log Output for Metric Viewing**: Enable metric log output in the test environment configuration to view results on the console:

```properties
metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 5 SECONDS
```

3. **Run the end-to-end Test and Verify Output**: Run the specific end-to-end test under path `inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15` and check whether `numDeserializeError` is the expected value:

```bash
mvn test -Dtest=Postgres2StarRocksTest
Note: You may want to insert test code or construct specific data to trigger `incDeserializeError()` and ensure your metrics are functioning as expected.
## Notes
* **Pass `MetricGroup` When Creating Metrics**: Ensure that when creating `SourceExactlyMetric` or `SinkExactlyMetric`, you pass a `MetricGroup` obtained via `runtimeContext` to avoid registration failures.
* **Check for Non-Null `MetricOption`**: Validate that `MetricOption` is non-null before creating metric objects to avoid null pointer exceptions due to missing `inlong.metric.labels`.
* **Handle Null Pointers**: Check for null `SourceExactlyMetric` or `SinkExactlyMetric` objects when operating on custom metrics like `incNumDeserializeSuccess()` to avoid null pointer exceptions if `'inlong.metric.labels'` isn’t specified.
* **End-to-end Test Coverage**: If a new connector metric isn’t covered by an end-to-end test, create a test to verify metric reporting functionality.
This approach allows the insertion of custom Flink metrics in the Postgres connector, verified by testing, to enhance observability.
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
---
title: Sort 自定义 Flink Metrics 指标
sidebar_position: 4
---
# Sort-Connector 自定义 Flink Metric 指标

## 总览

InLong Sort 框架允许用户在不同的 Connector 中定义和插入自定义 Flink Metric 指标,以便对数据处理过程进行深入监控。自定义指标一般用于衡量(反)序列化成功/失败次数、延迟时间、快照状态、事务完成状态等关键性能指标。这些指标通过 `SourceExactlyMetric``SinkExactlyMetric` 对象在合适的逻辑节点中进行记录和上报。

## 插入自定义 Flink Metric 指标的步骤

为了在新的 Connector 中创建并插入自定义 Flink Metric 指标,通常需要按照以下步骤进行。以反序列化过程中的错误计数 `numDeserializeError` 以及 `inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc` 为例,以下步骤将详细说明如何在 InLong Sort 框架中插入自定义 Metric。

### 1. 创建 Metric 对象

首先,需要在 `SourceExactlyMetric``SinkExactlyMetric` 类中添加新的 Flink Metric 对象。Metric 对象通常可以是 `Counter``Gauge``Meter` 等类型。在本例中,选择创建一个用于记录反序列化错误次数的 `Counter`,并将其作为类的成员变量:

private Counter numDeserializeError;

### 2. 编写 `registerMetricsForXXX` 方法

为了初始化并注册此 Metric 对象,需要编写一个 `registerMetricsForNumDeserializeError` 方法。在此方法中,通过 `registerCounter` 将该 `Counter` 对象注册到 Flink 的 Metric 系统,以便系统能够跟踪此 Metric。

public void registerMetricsForNumDeserializeError(Counter counter) {
numDeserializeError = registerCounter("numDeserializeError", counter);
}

在该方法中,通过调用 `registerCounter` 方法,将自定义的 `Counter` 对象与 Flink 的 Metric 系统相关联,并确保此 Metric 能够在后续的数据处理过程中被正确记录。

### 3. 在构造函数中调用注册方法

在类的构造函数中,根据传入的 `MetricOption``MetricGroup` 参数,调用上述编写的注册方法。这样可以确保在实例化时,Metric对象被正确初始化和注册:

public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) {
this.metricGroup = metricGroup;
this.labels = option.getLabels();
registerMetricsForNumDeserializeError(new ThreadSafeCounter());
}

通过在构造函数中调用 `registerMetricsForNumDeserializeError` 方法,确保 `numDeserializeError` 计数器在每次实例化时都已初始化,并准备好在数据处理过程中记录反序列化错误。

### 4. 编写 Metric 的 getter、setter 和操作方法

为了在外部能够操作 `numDeserializeError` 计数器,还需编写相应的getter和操作方法。在本例中,为 `numDeserializeError` 计数器提供一个增加计数的方法 `incNumDeserializeError` ,以便在反序列化过程发生异常时调用此方法递增计数:

public void incNumDeserializeError() {
if (numDeserializeError != null) {
numDeserializeError.inc();
}
}

通过编写该操作方法,可以确保在反序列化出现错误时能够调用 `incNumDeserializeError` 递增错误计数。

### 5. 在 `toString` 方法中增加新 Metric 的输出

为了便于调试和监控以及保证代码完整性,需要在 `toString` 方法中增加该自定义 Metric 的输出信息:

@Override
public String toString() {
return "SourceMetricData{"
+ ", numDeserializeError=" + numDeserializeError.getCount()
+ "}";
}


### 6. 在合适位置插入自定义 Metric

在 Metric 类中完成注册和初始化后,需要在合适的逻辑节点中调用该 Metric。在本例中,在反序列化方法中调用 `incNumDeserializeError` 方法,以记录每次反序列化错误的发生。此操作可以通过以下代码实现:

@Override
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
try {
// 执行反序列化逻辑
} catch (Exception e) {
// 反序列化失败时递增错误计数
// 必须检查sourceExactlyMetric是否为空
if(sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeError();
}
throw e;
}
}

在反序列化过程中,通过调用 `incNumDeserializeError` 方法,确保每次反序列化失败时都能增加错误计数,从而准确反映反序列化错误的频率。

## 测试和验证
使用 `sort-end-to-end-tests` 测试,位于 `inlong-sort/sort-end-to-end-tests/` 目录下。
1. **设置 SQL 中的 Metric 标签**:在测试SQL文件中增加 `inlong.metric.labels` 标签,确保 Flink 能识别 Metric 标签:
`sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2StarRocksTest.java` 中的 `sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql` 为例:
```sql
CREATE TABLE test_input1 (
`id` INT primary key,
name STRING,
description STRING
) WITH (
'connector' = 'postgres-cdc-inlong',
'hostname' = 'postgres',
'port' = '5432',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'test',
'table-name' = 'test_input1',
'schema-name' = 'public',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'inlong_slot',
'debezium.slot.name' = 'inlong_slot',
-- 增加的部分
'inlong.metric.labels' = 'groupId=pggroup&streamId=pgStream&nodeId=pgNode'

-- Sink 部分保持不变
);
```
剩余 Flink SQL 保持不变即可

2. **配置日志输出查看 Metric **:在测试环境配置中启用 Metric 日志输出,以便可以在控制台中看到统计结果:
```properties
metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 5 SECONDS
```

3. **运行 end-to-end 测试并验证输出**:使用以下命令在`inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15`运行指定的 end-to-end 测试,并在控制台中查看 `numDeserializeError` 的值是否为预期值。:
```bash
mvn test -Dtest=Postgres2StarRocksTest
```
提示:可以考虑添加一些逻辑或构造一些数据,触发`incDeserializeError()`,以确认 Metric 运作正常

## 注意事项

* **创建Metric时务必传入 `MetricGroup` **:确保在创建 `SourceExactlyMetric``SinkExactlyMetric` 对象时通过 `runtimeContext` 获取 `MetricGroup` 传入 `SourceExactlyMetric``SinkExactlyMetric` 的构造函数,以免出现指标注册失败的情况。
* **确认 MetricOption 非空**:在创建 Metric 对象前应检查 `MetricOption` 是否非空,以避免因缺少 `inlong.metric.labels` 而导致空指针异常。
* **处理空指针异常**:在操作 `SourceExactlyMetric``SinkExactlyMetric` 对象的自定义 `Flink Metric` 时,例如调用 `sourceExactlyMetric.incNumDeserializeSuccess()`,应判断该对`SourceExactlyMetric` 以及对应的 `Counter` `numDeserializeSuccess` 是否为空,以避免在Flink SQL中未指定 `'inlong.metric.labels'` 时出现空指针异常。
* ** End-to-end 测试覆盖**:如果新增指标的 Connector 没有被 end-to-end 测试覆盖,需要自行编写 end-to-end 测试以保障Metric能被正常上报。

通过这种方式,可以在 InLong Sort Connector 中插入自定义 Flink Metric 指标,并通过测试验证其工作状态,从而增强可观测性。

0 comments on commit b60ae76

Please sign in to comment.