Skip to content

Commit

Permalink
fix: let pass metricGroup to SourceExactlyMetric to allow metrics reg…
Browse files Browse the repository at this point in the history
…istration
  • Loading branch information
PeterZh6 committed Sep 18, 2024
1 parent 851023e commit 8b7a3f5
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.postgre;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;

import com.ververica.cdc.debezium.Validator;
Expand Down Expand Up @@ -202,22 +203,27 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
/** Buffer the events from the source and record the errors from the debezium. */
private transient Handover handover;

private SourceExactlyMetric sourceExactlyMetric;
/** Self-defined Flink metrics. */
private transient SourceExactlyMetric sourceExactlyMetric;

// record the start time of each checkpoint
private final transient Map<Long, Long> checkpointStartTimeMap = new HashMap<>();
private final MetricOption metricOption;

/** The map to store the start time of each checkpoint. */
private transient Map<Long, Long> checkpointStartTimeMap = new HashMap<>();

// ---------------------------------------------------------------------------------------

public DebeziumSourceFunction(
DebeziumDeserializationSchema<T> deserializer,
Properties properties,
@Nullable DebeziumOffset specificOffset,
Validator validator) {
Validator validator,
MetricOption metricOption) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
this.metricOption = metricOption;
}

@Override
Expand All @@ -230,9 +236,16 @@ public void open(Configuration parameters) throws Exception {
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
// get sourceExactlyMetric from deserializer to record metrics
if (sourceExactlyMetric == null && deserializer instanceof RowDataDebeziumDeserializeSchema) {
sourceExactlyMetric = ((RowDataDebeziumDeserializeSchema) deserializer).getSourceExactlyMetric();
if (sourceExactlyMetric == null) {
sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup());
}
if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
((RowDataDebeziumDeserializeSchema) deserializer)
.setSourceExactlyMetric(sourceExactlyMetric);
}
// instantiate checkpointStartTimeMap after restoring from checkpoint
if (checkpointStartTimeMap == null) {
checkpointStartTimeMap = new HashMap<>();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.postgre;

import org.apache.inlong.sort.base.metric.MetricOption;

import com.ververica.cdc.debezium.Validator;
import io.debezium.connector.postgresql.PostgresConnector;

Expand Down Expand Up @@ -53,6 +55,7 @@ public static class Builder<T> {
private String[] tableList;
private Properties dbzProperties;
private DebeziumDeserializationSchema<T> deserializer;
private MetricOption metricOption;

/**
* The name of the Postgres logical decoding plug-in installed on the server. Supported
Expand Down Expand Up @@ -146,6 +149,11 @@ public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
return this;
}

public Builder<T> metricOption(MetricOption metricOption) {
this.metricOption = metricOption;
return this;
}

public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class", PostgresConnector.class.getCanonicalName());
Expand Down Expand Up @@ -178,7 +186,7 @@ public DebeziumSourceFunction<T> build() {
}

return new DebeziumSourceFunction<>(
deserializer, props, null, Validator.getDefaultValidator());
deserializer, props, null, Validator.getDefaultValidator(), metricOption);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
PostgreSQLDeserializationConverterFactory.instance())
.setValueValidator(new PostgresValueValidator(schemaName, tableName))
.setChangelogMode(changelogMode)
.setMetricOption(metricOption)
.build();
DebeziumSourceFunction<RowData> sourceFunction =
PostgreSQLSource.<RowData>builder()
Expand All @@ -150,6 +149,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.slotName(slotName)
.debeziumProperties(dbzProperties)
.deserializer(deserializer)
.metricOption(metricOption)
.build();
return SourceFunctionProvider.of(sourceFunction, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.inlong.sort.postgre;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;

Expand All @@ -35,7 +34,7 @@
import io.debezium.time.NanoTime;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Timestamp;
import lombok.Getter;
import lombok.Setter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
Expand Down Expand Up @@ -101,8 +100,9 @@ public interface ValueValidator extends Serializable {

/** Changelog Mode to use for encoding changes in Flink internal data structure. */
private final DebeziumChangelogMode changelogMode;
private final MetricOption metricOption;
@Getter

/** Self-defined Flink metrics, which will be set by DebeziumSourceFunction with setter */
@Setter
private SourceExactlyMetric sourceExactlyMetric;

/** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
Expand All @@ -117,8 +117,7 @@ public static Builder newBuilder() {
ValueValidator validator,
ZoneId serverTimeZone,
DeserializationRuntimeConverterFactory userDefinedConverterFactory,
DebeziumChangelogMode changelogMode,
MetricOption metricOption) {
DebeziumChangelogMode changelogMode) {
this.hasMetadata = checkNotNull(metadataConverters).length > 0;
this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters);
this.physicalConverter =
Expand All @@ -129,14 +128,10 @@ public static Builder newBuilder() {
this.resultTypeInfo = checkNotNull(resultTypeInfo);
this.validator = checkNotNull(validator);
this.changelogMode = checkNotNull(changelogMode);
this.metricOption = metricOption;
}

@Override
public void open() {
if (metricOption != null) {
sourceExactlyMetric = new SourceExactlyMetric(metricOption);
}
}

@Override
Expand All @@ -152,7 +147,8 @@ public void deserialize(SourceRecord record, Collector<RowData> out) throws Exce
}
}

private void doDeserialize(SourceRecord record, Collector<RowData> out, long deseializeStartTime) throws Exception {
private void doDeserialize(SourceRecord record, Collector<RowData> out, long deserializeStartTime)
throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
Expand Down Expand Up @@ -190,7 +186,7 @@ private void doDeserialize(SourceRecord record, Collector<RowData> out, long des
}
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeSuccess();
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deseializeStartTime);
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime);
}
}

Expand Down Expand Up @@ -237,7 +233,6 @@ public static class Builder {
private DeserializationRuntimeConverterFactory userDefinedConverterFactory =
DeserializationRuntimeConverterFactory.DEFAULT;
private DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL;
private MetricOption metricOption;

public Builder setPhysicalRowType(RowType physicalRowType) {
this.physicalRowType = physicalRowType;
Expand Down Expand Up @@ -269,10 +264,6 @@ public Builder setChangelogMode(DebeziumChangelogMode changelogMode) {
this.changelogMode = changelogMode;
return this;
}
public Builder setMetricOption(MetricOption metricOption) {
this.metricOption = metricOption;
return this;
}

public RowDataDebeziumDeserializeSchema build() {
return new RowDataDebeziumDeserializeSchema(
Expand All @@ -282,8 +273,7 @@ public RowDataDebeziumDeserializeSchema build() {
validator,
serverTimeZone,
userDefinedConverterFactory,
changelogMode,
metricOption);
changelogMode);
}
}

Expand Down

0 comments on commit 8b7a3f5

Please sign in to comment.