Skip to content

Commit

Permalink
[INLONG-11355][Sort] Add new source metrics for sort-connector-mongod…
Browse files Browse the repository at this point in the history
…b-cdc-v1.15

Co-authored-by: yangyang-12-wq <[email protected]>
  • Loading branch information
PeterZh6 committed Oct 14, 2024
1 parent 12e9e1d commit 87220ee
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.inlong.sort.mongodb;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;

import com.ververica.cdc.debezium.Validator;
import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
import com.ververica.cdc.debezium.internal.DebeziumOffset;
Expand Down Expand Up @@ -61,6 +64,8 @@
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -197,17 +202,25 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
/** Buffer the events from the source and record the errors from the debezium. */
private transient Handover handover;

private transient SourceExactlyMetric sourceExactlyMetric;

private final MetricOption metricOption;

private transient Map<Long, Long> checkpointStartTimeMap;

// ---------------------------------------------------------------------------------------

public DebeziumSourceFunction(
DebeziumDeserializationSchema<T> deserializer,
Properties properties,
@Nullable DebeziumOffset specificOffset,
Validator validator) {
Validator validator,
MetricOption metricOption) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
this.metricOption = metricOption;
}

@Override
Expand All @@ -220,6 +233,14 @@ public void open(Configuration parameters) throws Exception {
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
if (metricOption != null) {
sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup());
}
this.checkpointStartTimeMap = new HashMap<>();
// set sourceExactlyMetric for deserializer
if (deserializer instanceof MongoDBConnectorDeserializationSchema) {
((MongoDBConnectorDeserializationSchema) deserializer).setSourceExactlyMetric(sourceExactlyMetric);
}
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -304,17 +325,32 @@ private void restoreHistoryRecordsState() throws Exception {

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
if (handover.hasError()) {
LOG.debug("snapshotState() called on closed source");
throw new FlinkRuntimeException(
"Call snapshotState() on closed source, checkpoint failed.");
} else {
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState();
}
if (deserializer instanceof MongoDBConnectorDeserializationSchema) {
((MongoDBConnectorDeserializationSchema) deserializer)
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
try {
if (handover.hasError()) {
LOG.debug("snapshotState() called on closed source");
throw new FlinkRuntimeException(
"Call snapshotState() on closed source, checkpoint failed.");
} else {
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState();
}
if (deserializer instanceof MongoDBConnectorDeserializationSchema) {
((MongoDBConnectorDeserializationSchema) deserializer)
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
}
if (checkpointStartTimeMap != null) {
checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis());
} else {
LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint");
}
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotCreate();;
}
} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeError();
}
throw e;
}
}

Expand Down Expand Up @@ -496,6 +532,16 @@ public void notifyCheckpointComplete(long checkpointId) {
schema.flushAudit();
schema.updateLastCheckpointId(checkpointId);
}
if (checkpointStartTimeMap != null) {
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null && sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotComplete();
sourceExactlyMetric
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);
}
} else {
LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint");
}
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,53 +137,66 @@ public void open() {

@Override
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();

OperationType op = operationTypeFor(record);
BsonDocument documentKey =
checkNotNull(
extractBsonDocument(
value, valueSchema, MongoDBEnvelope.DOCUMENT_KEY_FIELD));
BsonDocument fullDocument =
extractBsonDocument(value, valueSchema, MongoDBEnvelope.FULL_DOCUMENT_FIELD);
switch (op) {
case INSERT:
GenericRowData insert = extractRowData(fullDocument);
insert.setRowKind(RowKind.INSERT);
emit(record, insert,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
case DELETE:
GenericRowData delete = extractRowData(documentKey);
delete.setRowKind(RowKind.DELETE);
emit(record, delete,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
case UPDATE:
// It’s null if another operation deletes the document
// before the lookup operation happens. Ignored it.
if (fullDocument == null) {
long deserializeStartTime = System.currentTimeMillis();
try {
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();

OperationType op = operationTypeFor(record);
BsonDocument documentKey =
checkNotNull(
extractBsonDocument(
value, valueSchema, MongoDBEnvelope.DOCUMENT_KEY_FIELD));
BsonDocument fullDocument =
extractBsonDocument(value, valueSchema, MongoDBEnvelope.FULL_DOCUMENT_FIELD);
switch (op) {
case INSERT:
GenericRowData insert = extractRowData(fullDocument);
insert.setRowKind(RowKind.INSERT);
emit(record, insert,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
}
GenericRowData updateAfter = extractRowData(fullDocument);
updateAfter.setRowKind(RowKind.UPDATE_AFTER);
emit(record, updateAfter,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
case REPLACE:
GenericRowData replaceAfter = extractRowData(fullDocument);
replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
emit(record, replaceAfter,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
case INVALIDATE:
case DROP:
case DROP_DATABASE:
case RENAME:
case OTHER:
default:
break;
case DELETE:
GenericRowData delete = extractRowData(documentKey);
delete.setRowKind(RowKind.DELETE);
emit(record, delete,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
case UPDATE:
// It’s null if another operation deletes the document
// before the lookup operation happens. Ignored it.
if (fullDocument == null) {
break;
}
GenericRowData updateAfter = extractRowData(fullDocument);
updateAfter.setRowKind(RowKind.UPDATE_AFTER);
emit(record, updateAfter,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
case REPLACE:
GenericRowData replaceAfter = extractRowData(fullDocument);
replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
emit(record, replaceAfter,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
case INVALIDATE:
case DROP:
case DROP_DATABASE:
case RENAME:
case OTHER:
default:
break;
}
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeSuccess();
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime);
}

} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeError();
}
throw e;
}
}

Expand Down Expand Up @@ -827,4 +840,9 @@ public void updateLastCheckpointId(long checkpointId) {
sourceExactlyMetric.updateLastCheckpointId(checkpointId);
}
}

/** setter for DebeziumSourceFunction to set SourceExactlyMetric*/
public void setSourceExactlyMetric(SourceExactlyMetric sourceExactlyMetric) {
this.sourceExactlyMetric = sourceExactlyMetric;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.mongodb;

import org.apache.inlong.sort.base.metric.MetricOption;

import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance;
Expand All @@ -35,7 +37,11 @@
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.*;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.buildConnectionString;
import static org.apache.flink.util.Preconditions.checkArgument;

Expand Down Expand Up @@ -76,6 +82,7 @@ public static class Builder<T> {
private String copyExistingPipeline;
private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue();
private DebeziumDeserializationSchema<T> deserializer;
private MetricOption metricOption;

/** The comma-separated list of hostname and port pairs of mongodb servers. */
public Builder<T> hosts(String hosts) {
Expand Down Expand Up @@ -243,6 +250,11 @@ public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
return this;
}

public Builder<T> metricOption(MetricOption metricOption) {
this.metricOption = metricOption;
return this;
}

/**
* The properties of mongodb kafka connector.
* https://docs.mongodb.com/kafka-connector/current/kafka-source
Expand Down Expand Up @@ -338,7 +350,7 @@ public DebeziumSourceFunction<T> build() {
MongoSourceConfig.ERRORS_TOLERANCE_CONFIG, ErrorTolerance.NONE.value());

return new DebeziumSourceFunction<>(
deserializer, props, null, Validator.getDefaultValidator());
deserializer, props, null, Validator.getDefaultValidator(), metricOption);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.ifPresent(builder::heartbeatIntervalMillis);
Optional.ofNullable(splitMetaGroupSize).ifPresent(builder::splitMetaGroupSize);
Optional.ofNullable(splitSizeMB).ifPresent(builder::splitSizeMB);
Optional.ofNullable(metricOption).ifPresent(builder::metricOption);

return SourceProvider.of(builder.build());
} else {
org.apache.inlong.sort.mongodb.MongoDBSource.Builder<RowData> builder =
org.apache.inlong.sort.mongodb.MongoDBSource.<RowData>builder()
.hosts(hosts)
.deserializer(deserializer);
.deserializer(deserializer)
.metricOption(metricOption);

Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.mongodb.source;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema;

import com.ververica.cdc.connectors.base.options.StartupOptions;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class MongoDBSourceBuilder<T> {

private final MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory();
private DebeziumDeserializationSchema<T> deserializer;
private MetricOption metricOption;

/** The comma-separated list of hostname and port pairs of mongodb servers. */
public MongoDBSourceBuilder<T> hosts(String hosts) {
Expand Down Expand Up @@ -189,6 +191,11 @@ public MongoDBSourceBuilder<T> deserializer(DebeziumDeserializationSchema<T> des
return this;
}

public MongoDBSourceBuilder<T> metricOption(MetricOption metricOption) {
this.metricOption = metricOption;
return this;
}

/**
* Build the {@link MongoDBSource}.
*
Expand Down

0 comments on commit 87220ee

Please sign in to comment.