Skip to content

Commit

Permalink
[INLONG11201][Sort] Enhanced sink metric instrumentation for InLong S…
Browse files Browse the repository at this point in the history
…ort Flink Connector
  • Loading branch information
PeterZh6 committed Sep 26, 2024
1 parent e0d7f8d commit 9f8e616
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ public final class Constants {

public static final String SNAPSHOT_TO_CHECKPOINT_TIME_LAG = "snapshotToCheckpointTimeLag";

public static final String NUM_SERIALIZE_SUCCESS = "numSerializeSuccess";

public static final String NUM_SERIALIZE_ERROR = "numSerializeError";

public static final String SERIALIZE_TIME_LAG = "serializeTimeLag";

/**
* Timestamp when the read phase changed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
Expand All @@ -39,6 +40,13 @@
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
import static org.apache.inlong.sort.base.Constants.NUM_SERIALIZE_ERROR;
import static org.apache.inlong.sort.base.Constants.NUM_SERIALIZE_SUCCESS;
import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_COMPLETE;
import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_CREATE;
import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_ERROR;
import static org.apache.inlong.sort.base.Constants.SERIALIZE_TIME_LAG;
import static org.apache.inlong.sort.base.Constants.SNAPSHOT_TO_CHECKPOINT_TIME_LAG;
import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;

/**
Expand All @@ -59,11 +67,24 @@ public class SinkExactlyMetric implements MetricData, Serializable {
private Counter numBytesOutForMeter;
private Counter dirtyRecordsOut;
private Counter dirtyBytesOut;
private Counter numSerializeSuccess;
private Counter numSerializeError;
private Gauge<Long> serializeTimeLag;
/** number of attempts to create a snapshot, i.e. <code>snapshotState()</code> method call */
private Counter numSnapshotCreate;
/** number of errors when creating a snapshot */

private Counter numSnapshotError;
/** number of successful snapshot completions, i.e. <code>notifyCheckpointComplete</code> method call */
private Counter numSnapshotComplete;
private Gauge<Long> snapshotToCheckpointTimeLag;
private Meter numRecordsOutPerSecond;
private Meter numBytesOutPerSecond;
private List<Integer> auditKeys;
private Long currentCheckpointId = 0L;
private Long lastCheckpointId = 0L;
private Long snapshotToCheckpointDelay = 0L;
private Long serializeDelay = 0L;

public SinkExactlyMetric(MetricOption option, MetricGroup metricGroup) {
this.metricGroup = metricGroup;
Expand All @@ -78,6 +99,8 @@ public SinkExactlyMetric(MetricOption option, MetricGroup metricGroup) {
case DIRTY:
registerMetricsForDirtyBytesOut(new ThreadSafeCounter());
registerMetricsForDirtyRecordsOut(new ThreadSafeCounter());
registerMetricForNumSerializeError(new ThreadSafeCounter());
registerMetricForNumSnapshotError(new ThreadSafeCounter());
break;
case NORMAL:
recordsOutCounter.inc(option.getInitRecords());
Expand All @@ -88,6 +111,11 @@ public SinkExactlyMetric(MetricOption option, MetricGroup metricGroup) {
registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter());
registerMetricsForNumBytesOutPerSecond();
registerMetricsForNumRecordsOutPerSecond();
registerMetricForSnapshotToCheckpointTimeLag();
registerMetricForSerializeTimeLag();
registerMetricForNumSerializeSuccess(new ThreadSafeCounter());
registerMetricForNumSnapshotCreate(new ThreadSafeCounter());
registerMetricForNumSnapshotComplete(new ThreadSafeCounter());
break;
default:
recordsOutCounter.inc(option.getInitRecords());
Expand All @@ -102,6 +130,13 @@ public SinkExactlyMetric(MetricOption option, MetricGroup metricGroup) {
registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter());
registerMetricsForNumBytesOutPerSecond();
registerMetricsForNumRecordsOutPerSecond();
registerMetricForSnapshotToCheckpointTimeLag();
registerMetricForSerializeTimeLag();
registerMetricForNumSerializeSuccess(new ThreadSafeCounter());
registerMetricForNumSerializeError(new ThreadSafeCounter());
registerMetricForNumSnapshotCreate(new ThreadSafeCounter());
registerMetricForNumSnapshotError(new ThreadSafeCounter());
registerMetricForNumSnapshotComplete(new ThreadSafeCounter());
break;

}
Expand Down Expand Up @@ -184,6 +219,55 @@ public void registerMetricsForDirtyBytesOut(Counter counter) {
dirtyBytesOut = registerCounter(DIRTY_BYTES_OUT, counter);
}

public void registerMetricForNumSerializeSuccess() {
numSerializeSuccess = registerCounter(NUM_SERIALIZE_SUCCESS, new SimpleCounter());
}

public void registerMetricForNumSerializeSuccess(Counter counter) {
numSerializeSuccess = registerCounter(NUM_SERIALIZE_SUCCESS, counter);
}

public void registerMetricForNumSerializeError() {
numSerializeError = registerCounter(NUM_SERIALIZE_ERROR, new SimpleCounter());
}

public void registerMetricForNumSerializeError(Counter counter) {
numSerializeError = registerCounter(NUM_SERIALIZE_ERROR, counter);
}

public void registerMetricForSerializeTimeLag() {
serializeTimeLag = registerGauge(SERIALIZE_TIME_LAG, (Gauge<Long>) this::getSerializeDelay);
}

public void registerMetricForNumSnapshotCreate() {
numSnapshotCreate = registerCounter(NUM_SNAPSHOT_CREATE, new SimpleCounter());
}

public void registerMetricForNumSnapshotCreate(Counter counter) {
numSnapshotCreate = registerCounter(NUM_SNAPSHOT_CREATE, counter);
}

public void registerMetricForNumSnapshotError() {
numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, new SimpleCounter());
}

public void registerMetricForNumSnapshotError(Counter counter) {
numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, counter);
}

public void registerMetricForNumSnapshotComplete() {
numSnapshotComplete = registerCounter(NUM_SNAPSHOT_COMPLETE, new SimpleCounter());
}

public void registerMetricForNumSnapshotComplete(Counter counter) {
numSnapshotComplete = registerCounter(NUM_SNAPSHOT_COMPLETE, counter);
}

public void registerMetricForSnapshotToCheckpointTimeLag() {
snapshotToCheckpointTimeLag =
registerGauge(SNAPSHOT_TO_CHECKPOINT_TIME_LAG, (Gauge<Long>) this::getSnapshotToCheckpointDelay);
}

public Counter getNumRecordsOut() {
return numRecordsOut;
}
Expand All @@ -208,6 +292,50 @@ public Meter getNumBytesOutPerSecond() {
return numBytesOutPerSecond;
}

public Counter getNumSerializeSuccess() {
return numSerializeSuccess;
}

public Counter getNumSerializeError() {
return numSerializeError;
}

public Counter getNumSnapshotCreate() {
return numSnapshotCreate;
}

public Counter getNumSnapshotError() {
return numSnapshotError;
}

public Counter getNumSnapshotComplete() {
return numSnapshotComplete;
}

public Gauge<Long> getSerializeTimeLag() {
return serializeTimeLag;
}

public Gauge<Long> getSnapshotToCheckpointTimeLag() {
return snapshotToCheckpointTimeLag;
}

public Long getSnapshotToCheckpointDelay() {
return snapshotToCheckpointDelay;
}

public Long getSerializeDelay() {
return serializeDelay;
}

public void recordSerializeDelay(Long delay) {
this.serializeDelay = delay;
}

public void recordSnapshotToCheckpointDelay(Long delay) {
this.snapshotToCheckpointDelay = delay;
}

@Override
public MetricGroup getMetricGroup() {
return metricGroup;
Expand All @@ -218,6 +346,36 @@ public Map<String, String> getLabels() {
return labels;
}

public void incNumSerializeSuccess() {
if (numSerializeSuccess != null) {
numSerializeSuccess.inc();
}
}

public void incNumSerializeError() {
if (numSerializeError != null) {
numSerializeError.inc();
}
}

public void incNumSnapshotCreate() {
if (numSnapshotCreate != null) {
numSnapshotCreate.inc();
}
}

public void incNumSnapshotError() {
if (numSnapshotError != null) {
numSnapshotError.inc();
}
}

public void incNumSnapshotComplete() {
if (numSnapshotComplete != null) {
numSnapshotComplete.inc();
}
}

public Counter getNumRecordsOutForMeter() {
return numRecordsOutForMeter;
}
Expand Down Expand Up @@ -301,6 +459,8 @@ public String toString() {
+ ", labels=" + labels
+ ", dirtyRecords=" + dirtyRecordsOut.getCount()
+ ", dirtyBytes=" + dirtyBytesOut.getCount()
+ ", numSerializeError=" + numSerializeError.getCount()
+ ", numSnapshotError=" + numSnapshotError.getCount()
+ '}';
case NORMAL:
return "SinkMetricData{"
Expand All @@ -313,6 +473,11 @@ public String toString() {
+ ", numBytesOutForMeter=" + numBytesOutForMeter.getCount()
+ ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate()
+ ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate()
+ ", serializeTimeLag=" + serializeTimeLag.getValue()
+ ", snapshotToCheckpointTimeLag=" + snapshotToCheckpointTimeLag.getValue()
+ ", numSerializeSuccess=" + numSerializeSuccess.getCount()
+ ", numSnapshotCreate=" + numSnapshotCreate.getCount()
+ ", numSnapshotComplete=" + numSnapshotComplete.getCount()
+ '}';
default:
return "SinkMetricData{"
Expand All @@ -327,6 +492,13 @@ public String toString() {
+ ", dirtyBytesOut=" + dirtyBytesOut.getCount()
+ ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate()
+ ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate()
+ ", serializeTimeLag=" + serializeTimeLag.getValue()
+ ", snapshotToCheckpointTimeLag=" + snapshotToCheckpointTimeLag.getValue()
+ ", numSerializeSuccess=" + numSerializeSuccess.getCount()
+ ", numSnapshotCreate=" + numSnapshotCreate.getCount()
+ ", numSnapshotComplete=" + numSnapshotComplete.getCount()
+ ", numSerializeError=" + numSerializeError.getCount()
+ ", numSnapshotError=" + numSnapshotError.getCount()
+ '}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct
private SchemaUtils schemaUtils;
private String stateKey;

/** The map to store the start time of each checkpoint. */
private transient Map<Long, Long> checkpointStartTimeMap;

public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions,
TableSchema schema,
StarRocksIRowTransformer<T> rowTransformer, String inlongMetric,
Expand Down Expand Up @@ -208,12 +211,26 @@ public void invoke(T value, Context context)
flushLegacyData();

Object[] data = rowTransformer.transform(value, sinkOptions.supportUpsertDelete());

long serializeStartTime = System.currentTimeMillis();
String serializedData;
try {
serializedData = serializer.serialize(data);
} catch (Exception e) {
log.error("Failed to serialize data", e);
if (sinkExactlyMetric != null) {
sinkExactlyMetric.incNumSerializeError();
}
return;
}
if (sinkExactlyMetric != null) {
sinkExactlyMetric.incNumSerializeSuccess();
sinkExactlyMetric.recordSerializeDelay(System.currentTimeMillis() - serializeStartTime);
}
sinkManager.write(
null,
sinkOptions.getDatabaseName(),
sinkOptions.getTableName(),
serializer.serialize(schemaUtils.filterOutTimeField(data)));
serializedData);

ouputMetrics(value, data);
}
Expand Down Expand Up @@ -243,6 +260,8 @@ public void open(Configuration parameters) {

commitTransaction(Long.MAX_VALUE);
log.info("Open sink function v2. {}", EnvUtils.getGitInformation());

checkpointStartTimeMap = new HashMap<>();
}

@Override
Expand All @@ -266,10 +285,16 @@ public void close() {

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
// record the start time of each checkpoint
checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis());

updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
sinkManager.flush();

if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
if (sinkExactlyMetric != null) {
sinkExactlyMetric.incNumSnapshotCreate();
}
return;
}

Expand All @@ -280,8 +305,15 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw

snapshotStates.clear();
snapshotStates.add(StarrocksSnapshotState.of(snapshotMap));
if (sinkExactlyMetric != null) {
sinkExactlyMetric.incNumSnapshotCreate();
}
} else {
sinkManager.abort(snapshot);
checkpointStartTimeMap.remove(functionSnapshotContext.getCheckpointId());
if (sinkExactlyMetric != null) {
sinkExactlyMetric.incNumSnapshotError();
}
throw new RuntimeException("Snapshot state failed by prepare");
}

Expand Down Expand Up @@ -343,6 +375,15 @@ public void notifyCheckpointComplete(long checkpointId) {
commitTransaction(checkpointId);
flushAudit();
updateLastCheckpointId(checkpointId);
if (sinkExactlyMetric != null) {
sinkExactlyMetric.incNumSnapshotComplete();
}
// get the start time of the currently completed checkpoint
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null && sinkExactlyMetric != null) {
sinkExactlyMetric
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);
}
}

private void commitTransaction(long checkpointId) {
Expand Down

0 comments on commit 9f8e616

Please sign in to comment.