Skip to content

Commit

Permalink
[Feature][Sort] Enhanced Metric Instrumentation for InLong Sort Flink…
Browse files Browse the repository at this point in the history
… Connector
  • Loading branch information
PeterZh6 committed Sep 16, 2024
1 parent 179e478 commit 02cc2bf
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ public final class Constants {

public static final String CURRENT_EMIT_EVENT_TIME_LAG = "currentEmitEventTimeLag";

public static final String DESERIALIZE_TIME_LAG = "deserializeTimeLag";

public static final String NUM_DESERIALIZE_SUCCESS = "numDeserializeSuccess";

public static final String NUM_DESERIALIZE_ERROR = "numDeserializeError";

public static final String NUM_SNAPSHOT_CREATE = "numSnapshotCreate";

public static final String NUM_SNAPSHOT_ERROR = "numSnapshotError";

public static final String NUM_COMPLETED_SNAPSHOTS = "numCompletedSnapshots";

public static final String SNAPSHOT_TO_CHECKPOINT_TIME_LAG = "snapshotToCheckpointTimeLag";

/**
* Timestamp when the read phase changed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,19 @@
import static org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION;
import static org.apache.inlong.sort.base.Constants.CURRENT_EMIT_EVENT_TIME_LAG;
import static org.apache.inlong.sort.base.Constants.CURRENT_FETCH_EVENT_TIME_LAG;
import static org.apache.inlong.sort.base.Constants.DESERIALIZE_TIME_LAG;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
import static org.apache.inlong.sort.base.Constants.NUM_COMPLETED_SNAPSHOTS;
import static org.apache.inlong.sort.base.Constants.NUM_DESERIALIZE_ERROR;
import static org.apache.inlong.sort.base.Constants.NUM_DESERIALIZE_SUCCESS;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_CREATE;
import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_ERROR;
import static org.apache.inlong.sort.base.Constants.SNAPSHOT_TO_CHECKPOINT_TIME_LAG;
import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;

public class SourceExactlyMetric implements MetricData, Serializable, SourceMetricsReporter {
Expand All @@ -50,6 +57,13 @@ public class SourceExactlyMetric implements MetricData, Serializable, SourceMetr
private Counter numBytesIn;
private Counter numRecordsInForMeter;
private Counter numBytesInForMeter;
private Counter numDeserializeSuccess;
private Counter numDeserializeError;
private Gauge<Long> deserializeTimeLag;
private Counter numSnapshotCreate;
private Counter numSnapshotError;
private Counter numCompletedSnapshots;
private Gauge<Long> snapshotToCheckpointTimeLag;
private Meter numRecordsInPerSecond;
private Meter numBytesInPerSecond;
private AuditReporterImpl auditReporter;
Expand Down Expand Up @@ -80,6 +94,17 @@ public class SourceExactlyMetric implements MetricData, Serializable, SourceMetr
*/
private volatile long emitDelay = 0L;

/**
* deserializeDelay = deserializeEndTime - deserializeStartTime, where the deserializeStartTime is the time method deserialize is called,
* and deserializeEndTime is the time the record is emitted
*/
private volatile long deserializeDelay = 0L;

/**
* snapshotToCheckpointDelay = snapShotCompleteTime - snapShotStartTimeById, where the snapShotCompleteTime is the time the logic of notifyCheckpointComplete is finished
*/
private volatile long snapshotToCheckpointDelay = 0L;

public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) {
this.metricGroup = metricGroup;
this.labels = option.getLabels();
Expand All @@ -98,6 +123,12 @@ public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) {
registerMetricsForNumRecordsInPerSecond();
registerMetricsForCurrentFetchEventTimeLag();
registerMetricsForCurrentEmitEventTimeLag();
registerMetricsForDeserializeTimeLag();
registerMetricsForNumDeserializeSuccess(new ThreadSafeCounter());
registerMetricsForNumDeserializeError(new ThreadSafeCounter());
registerMetricsForNumSnapshotCreate(new ThreadSafeCounter());
registerMetricsForNumSnapshotError(new ThreadSafeCounter());
registerMetricsForSnapshotToCheckpointTimeLag();
break;
}

Expand Down Expand Up @@ -178,6 +209,58 @@ public void registerMetricsForCurrentFetchEventTimeLag() {
public void registerMetricsForCurrentEmitEventTimeLag() {
currentEmitEventTimeLag = registerGauge(CURRENT_EMIT_EVENT_TIME_LAG, (Gauge<Long>) this::getEmitDelay);
}
public void registerMetricsForDeserializeTimeLag() {
deserializeTimeLag = registerGauge(DESERIALIZE_TIME_LAG, (Gauge<Long>) this::getDeserializeDelay);
}

public void registerMetricsForNumDeserializeSuccess(Counter counter) {
numDeserializeSuccess = registerCounter(NUM_DESERIALIZE_SUCCESS, counter);
}

public void registerMetricsForNumDeserializeError(Counter counter) {
numDeserializeError = registerCounter(NUM_DESERIALIZE_ERROR, counter);
}

public void registerMetricsForNumSnapshotCreate(Counter counter) {
numSnapshotCreate = registerCounter(NUM_SNAPSHOT_CREATE, counter);
}

public void registerMetricsForNumSnapshotError(Counter counter) {
numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, counter);
}

public void registerMetricsForNumCompletedCheckpoints(Counter counter) {
numCompletedSnapshots = registerCounter(NUM_COMPLETED_SNAPSHOTS, counter);
}

public void registerMetricsForSnapshotToCheckpointTimeLag() {
snapshotToCheckpointTimeLag =
registerGauge(SNAPSHOT_TO_CHECKPOINT_TIME_LAG, (Gauge<Long>) this::getSnapshotToCheckpointDelay);
}

public Gauge getDeserializeTimeLag() {
return deserializeTimeLag;
}

public Gauge getSnapshotToCheckpointTimeLag() {
return snapshotToCheckpointTimeLag;
}

public Counter getNumDeserializeSuccess() {
return numDeserializeSuccess;
}

public Counter getNumDeserializeError() {
return numDeserializeError;
}

public Counter getNumSnapshotCreate() {
return numSnapshotCreate;
}

public Counter getNumSnapshotError() {
return numSnapshotError;
}

public Counter getNumRecordsIn() {
return numRecordsIn;
Expand Down Expand Up @@ -211,6 +294,26 @@ public long getEmitDelay() {
return emitDelay;
}

public long getDeserializeDelay() {
return deserializeDelay;
}

public long getSnapshotToCheckpointDelay() {
return snapshotToCheckpointDelay;
}

public Counter getNumCompletedSnapshots() {
return numCompletedSnapshots;
}

public void recordDeserializeDelay(long deserializeDelay) {
this.deserializeDelay = deserializeDelay;
}

public void recordSnapshotToCheckpointDelay(long snapshotToCheckpointDelay) {
this.snapshotToCheckpointDelay = snapshotToCheckpointDelay;
}

@Override
public MetricGroup getMetricGroup() {
return metricGroup;
Expand Down Expand Up @@ -262,6 +365,36 @@ private void outputDefaultMetrics(long rowCountSize, long rowDataSize) {
}
}

public void incNumDeserializeSuccess() {
if (numDeserializeSuccess != null) {
numDeserializeSuccess.inc();
}
}

public void incNumDeserializeError() {
if (numDeserializeError != null) {
numDeserializeError.inc();
}
}

public void incNumSnapshotCreate() {
if (numSnapshotCreate != null) {
numSnapshotCreate.inc();
}
}

public void incNumSnapshotError() {
if (numSnapshotError != null) {
numSnapshotError.inc();
}
}

public void incNumCompletedSnapshots() {
if (numCompletedSnapshots != null) {
numCompletedSnapshots.inc();
}
}

/**
* flush audit data
* usually call this method in close method or when checkpointing
Expand Down Expand Up @@ -292,6 +425,14 @@ public String toString() {
+ ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
+ ", currentFetchEventTimeLag=" + currentFetchEventTimeLag.getValue()
+ ", currentEmitEventTimeLag=" + currentEmitEventTimeLag.getValue()
+ ", deserializeTimeLag=" + deserializeTimeLag.getValue()
+ ", numDeserializeSuccess=" + numDeserializeSuccess.getCount()
+ ", numDeserializeError=" + numDeserializeError.getCount()
+ ", numSnapshotCreate=" + numSnapshotCreate.getCount()
+ ", numSnapshotError=" + numSnapshotError.getCount()
+ ", snapshotToCheckpointTimeLag=" + snapshotToCheckpointTimeLag.getValue()
+ ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate()
+ ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
+ ", auditReporter=" + auditReporter
+ '}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.postgre;

import org.apache.inlong.sort.base.metric.SourceExactlyMetric;

import com.ververica.cdc.debezium.Validator;
import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
import com.ververica.cdc.debezium.internal.DebeziumOffset;
Expand Down Expand Up @@ -62,6 +64,8 @@
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -198,6 +202,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
/** Buffer the events from the source and record the errors from the debezium. */
private transient Handover handover;

private SourceExactlyMetric sourceExactlyMetric;

// record the start time of each checkpoint
private final transient Map<Long, Long> checkpointStartTimeMap = new HashMap<>();

// ---------------------------------------------------------------------------------------

public DebeziumSourceFunction(
Expand All @@ -221,6 +230,10 @@ public void open(Configuration parameters) throws Exception {
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
// get sourceExactlyMetric from deserializer to record metrics
if (sourceExactlyMetric == null && deserializer instanceof RowDataDebeziumDeserializeSchema) {
sourceExactlyMetric = ((RowDataDebeziumDeserializeSchema) deserializer).getSourceExactlyMetric();
}
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -305,6 +318,17 @@ private void restoreHistoryRecordsState() throws Exception {

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
try {
doSnapshotState(functionSnapshotContext);
} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotError();
}
throw e;
}
}

private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
if (handover.hasError()) {
LOG.debug("snapshotState() called on closed source");
throw new FlinkRuntimeException(
Expand All @@ -317,6 +341,10 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw
((RowDataDebeziumDeserializeSchema) deserializer)
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
}
// record the start time of each checkpoint
long checkpointId = functionSnapshotContext.getCheckpointId();
checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis());
sourceExactlyMetric.incNumSnapshotCreate();
}

private void snapshotOffsetState(long checkpointId) throws Exception {
Expand Down Expand Up @@ -496,6 +524,12 @@ public void notifyCheckpointComplete(long checkpointId) {
schema.flushAudit();
schema.updateLastCheckpointId(checkpointId);
}
// get the start time of the currently completed checkpoint
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null) {
sourceExactlyMetric.incNumCompletedSnapshots();
sourceExactlyMetric.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);
}
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.debezium.time.NanoTime;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Timestamp;
import lombok.Getter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
Expand Down Expand Up @@ -101,6 +102,9 @@ public interface ValueValidator extends Serializable {
/** Changelog Mode to use for encoding changes in Flink internal data structure. */
private final DebeziumChangelogMode changelogMode;
private final MetricOption metricOption;

// Getter to make sourceExactlyMetric accessible to DebeziumSourceFunction
@Getter
private SourceExactlyMetric sourceExactlyMetric;

/** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
Expand Down Expand Up @@ -139,6 +143,18 @@ public void open() {

@Override
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
long deseializeStartTime = System.currentTimeMillis();
try {
doDeserialize(record, out, deseializeStartTime);
} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeError();
}
throw e;
}
}

private void doDeserialize(SourceRecord record, Collector<RowData> out, long deseializeStartTime) throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
Expand Down Expand Up @@ -174,6 +190,10 @@ public void deserialize(SourceRecord record, Collector<RowData> out) throws Exce
}
emit(record, after, out);
}
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeSuccess();
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deseializeStartTime);
}
}

private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception {
Expand Down

0 comments on commit 02cc2bf

Please sign in to comment.