diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java index f535082ea28..438d19983e1 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java @@ -17,8 +17,6 @@ package org.apache.inlong.sort.pulsar.source.reader; -import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema; - import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.source.ReaderOutput; @@ -34,6 +32,8 @@ import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState; import org.apache.flink.core.io.InputStatus; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; +import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; @@ -70,6 +70,10 @@ public class PulsarOrderedSourceReader extends PulsarSourceReaderBase private final AtomicReference cursorCommitThrowable = new AtomicReference<>(); private final PulsarDeserializationSchema deserializationSchema; private ScheduledExecutorService cursorScheduler; + private SourceExactlyMetric sourceExactlyMetric; + + /** The map to store the start time of each checkpoint. */ + private transient Map checkpointStartTimeMap; public PulsarOrderedSourceReader( FutureCompletingBlockingQueue>> elementsQueue, @@ -90,6 +94,10 @@ public PulsarOrderedSourceReader( this.cursorsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); this.cursorsOfFinishedSplits = new ConcurrentHashMap<>(); this.deserializationSchema = deserializationSchema; + // get SourceExactlyMetric instance from deserializationSchema + if(deserializationSchema instanceof PulsarTableDeserializationSchema) { + this.sourceExactlyMetric = ((PulsarTableDeserializationSchema)deserializationSchema).getSourceExactlyMetric(); + } } @Override @@ -131,25 +139,41 @@ protected void onSplitFinished(Map finishedSp @Override public List snapshotState(long checkpointId) { - if (deserializationSchema instanceof PulsarTableDeserializationSchema) { - ((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId); - } - List splits = super.snapshotState(checkpointId); + try { + // record the start time of each checkpoint + if (checkpointStartTimeMap != null) { + checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis()); + } + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotCreate(); + } - // Perform a snapshot for these splits. - Map cursors = - cursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); - // Put the cursors of the active splits. - for (PulsarPartitionSplit split : splits) { - MessageId latestConsumedId = split.getLatestConsumedId(); - if (latestConsumedId != null) { - cursors.put(split.getPartition(), latestConsumedId); + if (deserializationSchema instanceof PulsarTableDeserializationSchema) { + ((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId); + } + List splits = super.snapshotState(checkpointId); + + // Perform a snapshot for these splits. + Map cursors = + cursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); + // Put the cursors of the active splits. + for (PulsarPartitionSplit split : splits) { + MessageId latestConsumedId = split.getLatestConsumedId(); + if (latestConsumedId != null) { + cursors.put(split.getPartition(), latestConsumedId); + } + } + // Put cursors of all the finished splits. + cursors.putAll(cursorsOfFinishedSplits); + + return splits; + } catch(Exception e) { + if(sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotError(); + throw e; } } - // Put cursors of all the finished splits. - cursors.putAll(cursorsOfFinishedSplits); - - return splits; + return null; } @Override @@ -170,6 +194,17 @@ public void notifyCheckpointComplete(long checkpointId) { pulsarTableDeserializationSchema.flushAudit(); pulsarTableDeserializationSchema.updateLastCheckpointId(checkpointId); } + // get the start time of the currently completed checkpoint + if (checkpointStartTimeMap != null) { + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + if (snapShotStartTimeById != null && sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotComplete(); + sourceExactlyMetric + .recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + } + } else { + LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint"); + } } catch (Exception e) { LOG.error("Failed to acknowledge cursors for checkpoint {}", checkpointId, e); cursorCommitThrowable.compareAndSet(null, e); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java index c05f485af6f..e1235de885e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java @@ -17,10 +17,6 @@ package org.apache.inlong.sort.pulsar.table; -import org.apache.inlong.sort.base.metric.MetricOption; -import org.apache.inlong.sort.base.metric.MetricsCollector; -import org.apache.inlong.sort.base.metric.SourceExactlyMetric; - import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -28,10 +24,12 @@ import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricsCollector; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; import org.apache.pulsar.client.api.Message; import javax.annotation.Nullable; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -94,27 +92,39 @@ public void open(DeserializationSchema.InitializationContext context, SourceConf @Override public void deserialize(Message message, Collector collector) throws IOException { - // Get the key row data - List keyRowData = new ArrayList<>(); - if (keyDeserialization != null) { - keyDeserialization.deserialize(message.getKeyBytes(), new ListCollector<>(keyRowData)); + try { + long deserializeStartTime = System.currentTimeMillis(); + // Get the key row data + List keyRowData = new ArrayList<>(); + if (keyDeserialization != null) { + keyDeserialization.deserialize(message.getKeyBytes(), new ListCollector<>(keyRowData)); + } + + // Get the value row data + List valueRowData = new ArrayList<>(); + + if (upsertMode && message.getData().length == 0) { + rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, collector); + return; + } + + MetricsCollector metricsCollector = + new MetricsCollector<>(collector, sourceExactlyMetric); + + valueDeserialization.deserialize(message.getData(), new ListCollector<>(valueRowData)); + + rowDataConverter.projectToProducedRowAndCollect( + message, keyRowData, valueRowData, metricsCollector); + if(sourceExactlyMetric != null) { + sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime); + sourceExactlyMetric.incNumDeserializeSuccess(); + } + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeError(); + } + throw e; } - - // Get the value row data - List valueRowData = new ArrayList<>(); - - if (upsertMode && message.getData().length == 0) { - rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, collector); - return; - } - - MetricsCollector metricsCollector = - new MetricsCollector<>(collector, sourceExactlyMetric); - - valueDeserialization.deserialize(message.getData(), new ListCollector<>(valueRowData)); - - rowDataConverter.projectToProducedRowAndCollect( - message, keyRowData, valueRowData, metricsCollector); } @Override @@ -139,4 +149,9 @@ public void updateLastCheckpointId(long checkpointId) { sourceExactlyMetric.updateLastCheckpointId(checkpointId); } } + + /** getter for PulsarSourceReader to record metrics */ + public SourceExactlyMetric getSourceExactlyMetric() { + return sourceExactlyMetric; + } } \ No newline at end of file