diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 8fd698fa126..ff3c6946a4c 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -67,6 +67,20 @@ public final class Constants { public static final String CURRENT_EMIT_EVENT_TIME_LAG = "currentEmitEventTimeLag"; + public static final String DESERIALIZE_TIME_LAG = "deserializeTimeLag"; + + public static final String NUM_DESERIALIZE_SUCCESS = "numDeserializeSuccess"; + + public static final String NUM_DESERIALIZE_ERROR = "numDeserializeError"; + + public static final String NUM_SNAPSHOT_CREATE = "numSnapshotCreate"; + + public static final String NUM_SNAPSHOT_ERROR = "numSnapshotError"; + + public static final String NUM_COMPLETED_SNAPSHOTS = "numCompletedSnapshots"; + + public static final String SNAPSHOT_TO_CHECKPOINT_TIME_LAG = "snapshotToCheckpointTimeLag"; + /** * Timestamp when the read phase changed */ diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java index 19f9f1eda92..6f32d15f767 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java @@ -33,12 +33,19 @@ import static org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION; import static org.apache.inlong.sort.base.Constants.CURRENT_EMIT_EVENT_TIME_LAG; import static org.apache.inlong.sort.base.Constants.CURRENT_FETCH_EVENT_TIME_LAG; +import static org.apache.inlong.sort.base.Constants.DESERIALIZE_TIME_LAG; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND; +import static org.apache.inlong.sort.base.Constants.NUM_COMPLETED_SNAPSHOTS; +import static org.apache.inlong.sort.base.Constants.NUM_DESERIALIZE_ERROR; +import static org.apache.inlong.sort.base.Constants.NUM_DESERIALIZE_SUCCESS; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND; +import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_CREATE; +import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_ERROR; +import static org.apache.inlong.sort.base.Constants.SNAPSHOT_TO_CHECKPOINT_TIME_LAG; import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize; public class SourceExactlyMetric implements MetricData, Serializable, SourceMetricsReporter { @@ -50,6 +57,13 @@ public class SourceExactlyMetric implements MetricData, Serializable, SourceMetr private Counter numBytesIn; private Counter numRecordsInForMeter; private Counter numBytesInForMeter; + private Counter numDeserializeSuccess; + private Counter numDeserializeError; + private Gauge deserializeTimeLag; + private Counter numSnapshotCreate; + private Counter numSnapshotError; + private Counter numCompletedSnapshots; + private Gauge snapshotToCheckpointTimeLag; private Meter numRecordsInPerSecond; private Meter numBytesInPerSecond; private AuditReporterImpl auditReporter; @@ -80,6 +94,17 @@ public class SourceExactlyMetric implements MetricData, Serializable, SourceMetr */ private volatile long emitDelay = 0L; + /** + * deserializeDelay = deserializeEndTime - deserializeStartTime, where the deserializeStartTime is the time method deserialize is called, + * and deserializeEndTime is the time the record is emitted + */ + private volatile long deserializeDelay = 0L; + + /** + * snapshotToCheckpointDelay = snapShotCompleteTime - snapShotStartTimeById, where the snapShotCompleteTime is the time the logic of notifyCheckpointComplete is finished + */ + private volatile long snapshotToCheckpointDelay = 0L; + public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) { this.metricGroup = metricGroup; this.labels = option.getLabels(); @@ -98,6 +123,12 @@ public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) { registerMetricsForNumRecordsInPerSecond(); registerMetricsForCurrentFetchEventTimeLag(); registerMetricsForCurrentEmitEventTimeLag(); + registerMetricsForDeserializeTimeLag(); + registerMetricsForNumDeserializeSuccess(new ThreadSafeCounter()); + registerMetricsForNumDeserializeError(new ThreadSafeCounter()); + registerMetricsForNumSnapshotCreate(new ThreadSafeCounter()); + registerMetricsForNumSnapshotError(new ThreadSafeCounter()); + registerMetricsForSnapshotToCheckpointTimeLag(); break; } @@ -178,6 +209,58 @@ public void registerMetricsForCurrentFetchEventTimeLag() { public void registerMetricsForCurrentEmitEventTimeLag() { currentEmitEventTimeLag = registerGauge(CURRENT_EMIT_EVENT_TIME_LAG, (Gauge) this::getEmitDelay); } + public void registerMetricsForDeserializeTimeLag() { + deserializeTimeLag = registerGauge(DESERIALIZE_TIME_LAG, (Gauge) this::getDeserializeDelay); + } + + public void registerMetricsForNumDeserializeSuccess(Counter counter) { + numDeserializeSuccess = registerCounter(NUM_DESERIALIZE_SUCCESS, counter); + } + + public void registerMetricsForNumDeserializeError(Counter counter) { + numDeserializeError = registerCounter(NUM_DESERIALIZE_ERROR, counter); + } + + public void registerMetricsForNumSnapshotCreate(Counter counter) { + numSnapshotCreate = registerCounter(NUM_SNAPSHOT_CREATE, counter); + } + + public void registerMetricsForNumSnapshotError(Counter counter) { + numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, counter); + } + + public void registerMetricsForNumCompletedCheckpoints(Counter counter) { + numCompletedSnapshots = registerCounter(NUM_COMPLETED_SNAPSHOTS, counter); + } + + public void registerMetricsForSnapshotToCheckpointTimeLag() { + snapshotToCheckpointTimeLag = + registerGauge(SNAPSHOT_TO_CHECKPOINT_TIME_LAG, (Gauge) this::getSnapshotToCheckpointDelay); + } + + public Gauge getDeserializeTimeLag() { + return deserializeTimeLag; + } + + public Gauge getSnapshotToCheckpointTimeLag() { + return snapshotToCheckpointTimeLag; + } + + public Counter getNumDeserializeSuccess() { + return numDeserializeSuccess; + } + + public Counter getNumDeserializeError() { + return numDeserializeError; + } + + public Counter getNumSnapshotCreate() { + return numSnapshotCreate; + } + + public Counter getNumSnapshotError() { + return numSnapshotError; + } public Counter getNumRecordsIn() { return numRecordsIn; @@ -211,6 +294,26 @@ public long getEmitDelay() { return emitDelay; } + public long getDeserializeDelay() { + return deserializeDelay; + } + + public long getSnapshotToCheckpointDelay() { + return snapshotToCheckpointDelay; + } + + public Counter getNumCompletedSnapshots() { + return numCompletedSnapshots; + } + + public void recordDeserializeDelay(long deserializeDelay) { + this.deserializeDelay = deserializeDelay; + } + + public void recordSnapshotToCheckpointDelay(long snapshotToCheckpointDelay) { + this.snapshotToCheckpointDelay = snapshotToCheckpointDelay; + } + @Override public MetricGroup getMetricGroup() { return metricGroup; @@ -262,6 +365,36 @@ private void outputDefaultMetrics(long rowCountSize, long rowDataSize) { } } + public void incNumDeserializeSuccess() { + if (numDeserializeSuccess != null) { + numDeserializeSuccess.inc(); + } + } + + public void incNumDeserializeError() { + if (numDeserializeError != null) { + numDeserializeError.inc(); + } + } + + public void incNumSnapshotCreate() { + if (numSnapshotCreate != null) { + numSnapshotCreate.inc(); + } + } + + public void incNumSnapshotError() { + if (numSnapshotError != null) { + numSnapshotError.inc(); + } + } + + public void incNumCompletedSnapshots() { + if (numCompletedSnapshots != null) { + numCompletedSnapshots.inc(); + } + } + /** * flush audit data * usually call this method in close method or when checkpointing @@ -292,6 +425,14 @@ public String toString() { + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate() + ", currentFetchEventTimeLag=" + currentFetchEventTimeLag.getValue() + ", currentEmitEventTimeLag=" + currentEmitEventTimeLag.getValue() + + ", deserializeTimeLag=" + deserializeTimeLag.getValue() + + ", numDeserializeSuccess=" + numDeserializeSuccess.getCount() + + ", numDeserializeError=" + numDeserializeError.getCount() + + ", numSnapshotCreate=" + numSnapshotCreate.getCount() + + ", numSnapshotError=" + numSnapshotError.getCount() + + ", snapshotToCheckpointTimeLag=" + snapshotToCheckpointTimeLag.getValue() + + ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate() + + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate() + ", auditReporter=" + auditReporter + '}'; } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java index 5efc6c6ea5d..5ef6abacb98 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.postgre; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; + import com.ververica.cdc.debezium.Validator; import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer; import com.ververica.cdc.debezium.internal.DebeziumOffset; @@ -62,6 +64,8 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -198,6 +202,11 @@ public class DebeziumSourceFunction extends RichSourceFunction /** Buffer the events from the source and record the errors from the debezium. */ private transient Handover handover; + private SourceExactlyMetric sourceExactlyMetric; + + // record the start time of each checkpoint + private final transient Map checkpointStartTimeMap = new HashMap<>(); + // --------------------------------------------------------------------------------------- public DebeziumSourceFunction( @@ -221,6 +230,10 @@ public void open(Configuration parameters) throws Exception { this.executor = Executors.newSingleThreadExecutor(threadFactory); this.handover = new Handover(); this.changeConsumer = new DebeziumChangeConsumer(handover); + // get sourceExactlyMetric from deserializer to record metrics + if (sourceExactlyMetric == null && deserializer instanceof RowDataDebeziumDeserializeSchema) { + sourceExactlyMetric = ((RowDataDebeziumDeserializeSchema) deserializer).getSourceExactlyMetric(); + } } // ------------------------------------------------------------------------ @@ -305,6 +318,17 @@ private void restoreHistoryRecordsState() throws Exception { @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + try { + doSnapshotState(functionSnapshotContext); + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotError(); + } + throw e; + } + } + + private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { if (handover.hasError()) { LOG.debug("snapshotState() called on closed source"); throw new FlinkRuntimeException( @@ -317,6 +341,10 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw ((RowDataDebeziumDeserializeSchema) deserializer) .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); } + // record the start time of each checkpoint + long checkpointId = functionSnapshotContext.getCheckpointId(); + checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis()); + sourceExactlyMetric.incNumSnapshotCreate(); } private void snapshotOffsetState(long checkpointId) throws Exception { @@ -496,6 +524,12 @@ public void notifyCheckpointComplete(long checkpointId) { schema.flushAudit(); schema.updateLastCheckpointId(checkpointId); } + // get the start time of the currently completed checkpoint + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + if (snapShotStartTimeById != null) { + sourceExactlyMetric.incNumCompletedSnapshots(); + sourceExactlyMetric.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + } } catch (Exception e) { // ignore exception if we are no longer running LOG.warn("Ignore error when committing offset to database.", e); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java index fdf2d013279..f5075ed4e7a 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -35,6 +35,7 @@ import io.debezium.time.NanoTime; import io.debezium.time.NanoTimestamp; import io.debezium.time.Timestamp; +import lombok.Getter; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; @@ -101,6 +102,9 @@ public interface ValueValidator extends Serializable { /** Changelog Mode to use for encoding changes in Flink internal data structure. */ private final DebeziumChangelogMode changelogMode; private final MetricOption metricOption; + + // Getter to make sourceExactlyMetric accessible to DebeziumSourceFunction + @Getter private SourceExactlyMetric sourceExactlyMetric; /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ @@ -139,6 +143,18 @@ public void open() { @Override public void deserialize(SourceRecord record, Collector out) throws Exception { + long deseializeStartTime = System.currentTimeMillis(); + try { + doDeserialize(record, out, deseializeStartTime); + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeError(); + } + throw e; + } + } + + private void doDeserialize(SourceRecord record, Collector out, long deseializeStartTime) throws Exception { Envelope.Operation op = Envelope.operationFor(record); Struct value = (Struct) record.value(); Schema valueSchema = record.valueSchema(); @@ -174,6 +190,10 @@ public void deserialize(SourceRecord record, Collector out) throws Exce } emit(record, after, out); } + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeSuccess(); + sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deseializeStartTime); + } } private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception {