Skip to content

Commit

Permalink
[INLONG-11340][Sort]Added new source metrics for sort-connector-pulsa…
Browse files Browse the repository at this point in the history
…r-v1.15 apache#11340
  • Loading branch information
PeterZh6 committed Oct 11, 2024
1 parent c37bf2a commit d1bd3ab
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,12 @@ public void incNumDeserializeSuccess() {
}
}

public void decNumDeserializeSuccess() {
if (numDeserializeSuccess != null) {
numDeserializeSuccess.dec();
}
}

public void incNumDeserializeError() {
if (numDeserializeError != null) {
numDeserializeError.inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.pulsar.source.reader;

import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema;

import org.apache.flink.annotation.Internal;
Expand Down Expand Up @@ -70,6 +71,10 @@ public class PulsarOrderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT>
private final AtomicReference<Throwable> cursorCommitThrowable = new AtomicReference<>();
private final PulsarDeserializationSchema<OUT> deserializationSchema;
private ScheduledExecutorService cursorScheduler;
private SourceExactlyMetric sourceExactlyMetric;

/** The map to store the start time of each checkpoint. */
private transient Map<Long, Long> checkpointStartTimeMap;

public PulsarOrderedSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> elementsQueue,
Expand All @@ -90,6 +95,12 @@ public PulsarOrderedSourceReader(
this.cursorsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
this.cursorsOfFinishedSplits = new ConcurrentHashMap<>();
this.deserializationSchema = deserializationSchema;
// get SourceExactlyMetric instance from deserializationSchema
if (deserializationSchema instanceof PulsarTableDeserializationSchema) {
this.sourceExactlyMetric =
((PulsarTableDeserializationSchema) deserializationSchema).getSourceExactlyMetric();
}
this.checkpointStartTimeMap = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -131,25 +142,40 @@ protected void onSplitFinished(Map<String, PulsarPartitionSplitState> finishedSp

@Override
public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
if (deserializationSchema instanceof PulsarTableDeserializationSchema) {
((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId);
}
List<PulsarPartitionSplit> splits = super.snapshotState(checkpointId);
try {
// record the start time of each checkpoint
if (checkpointStartTimeMap != null) {
checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis());
}
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotCreate();
}

// Perform a snapshot for these splits.
Map<TopicPartition, MessageId> cursors =
cursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());
// Put the cursors of the active splits.
for (PulsarPartitionSplit split : splits) {
MessageId latestConsumedId = split.getLatestConsumedId();
if (latestConsumedId != null) {
cursors.put(split.getPartition(), latestConsumedId);
if (deserializationSchema instanceof PulsarTableDeserializationSchema) {
((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId);
}
}
// Put cursors of all the finished splits.
cursors.putAll(cursorsOfFinishedSplits);
List<PulsarPartitionSplit> splits = super.snapshotState(checkpointId);

// Perform a snapshot for these splits.
Map<TopicPartition, MessageId> cursors =
cursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());
// Put the cursors of the active splits.
for (PulsarPartitionSplit split : splits) {
MessageId latestConsumedId = split.getLatestConsumedId();
if (latestConsumedId != null) {
cursors.put(split.getPartition(), latestConsumedId);
}
}
// Put cursors of all the finished splits.
cursors.putAll(cursorsOfFinishedSplits);

return splits;
return splits;
} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotError();
}
throw e;
}
}

@Override
Expand All @@ -170,6 +196,17 @@ public void notifyCheckpointComplete(long checkpointId) {
pulsarTableDeserializationSchema.flushAudit();
pulsarTableDeserializationSchema.updateLastCheckpointId(checkpointId);
}
// get the start time of the currently completed checkpoint
if (checkpointStartTimeMap != null) {
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null && sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotComplete();
sourceExactlyMetric
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);
}
} else {
LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint");
}
} catch (Exception e) {
LOG.error("Failed to acknowledge cursors for checkpoint {}", checkpointId, e);
cursorCommitThrowable.compareAndSet(null, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.pulsar.source.reader;

import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema;

import org.apache.flink.annotation.Internal;
Expand All @@ -41,6 +42,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
Expand All @@ -66,6 +68,11 @@ public class PulsarUnorderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT
private final PulsarDeserializationSchema<OUT> deserializationSchema;
private boolean started = false;

private SourceExactlyMetric sourceExactlyMetric;

/** The map to store the start time of each checkpoint. */
private transient Map<Long, Long> checkpointStartTimeMap;

public PulsarUnorderedSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> elementsQueue,
Supplier<PulsarUnorderedPartitionSplitReader<OUT>> splitReaderSupplier,
Expand All @@ -86,6 +93,11 @@ public PulsarUnorderedSourceReader(
this.transactionsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
this.transactionsOfFinishedSplits = Collections.synchronizedList(new ArrayList<>());
this.deserializationSchema = deserializationSchema;
if (deserializationSchema instanceof PulsarTableDeserializationSchema) {
this.sourceExactlyMetric =
((PulsarTableDeserializationSchema) deserializationSchema).getSourceExactlyMetric();
}
this.checkpointStartTimeMap = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -141,26 +153,38 @@ protected void onSplitFinished(Map<String, PulsarPartitionSplitState> finishedSp

@Override
public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
LOG.debug("Trigger the new transaction for downstream readers.");
if (deserializationSchema instanceof PulsarTableDeserializationSchema) {
((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId);
}
List<PulsarPartitionSplit> splits =
((PulsarUnorderedFetcherManager<OUT>) splitFetcherManager).snapshotState();
try {
// record the start time of each checkpoint
if (checkpointStartTimeMap != null) {
checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis());
}
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotCreate();
}
LOG.debug("Trigger the new transaction for downstream readers.");
if (deserializationSchema instanceof PulsarTableDeserializationSchema) {
((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId);
}
List<PulsarPartitionSplit> splits =
((PulsarUnorderedFetcherManager<OUT>) splitFetcherManager).snapshotState();

if (coordinatorClient == null) {
return splits;
}
// Snapshot the transaction status and commit it after checkpoint finishing.
List<TxnID> txnIDs =
transactionsToCommit.computeIfAbsent(checkpointId, id -> new ArrayList<>());
for (PulsarPartitionSplit split : splits) {
TxnID uncommittedTransactionId = split.getUncommittedTransactionId();
if (uncommittedTransactionId != null) {
txnIDs.add(uncommittedTransactionId);
if (coordinatorClient == null) {
return splits;
}
// Snapshot the transaction status and commit it after checkpoint finishing.
List<TxnID> txnIDs =
transactionsToCommit.computeIfAbsent(checkpointId, id -> new ArrayList<>());
for (PulsarPartitionSplit split : splits) {
TxnID uncommittedTransactionId = split.getUncommittedTransactionId();
if (uncommittedTransactionId != null) {
txnIDs.add(uncommittedTransactionId);
}
}
return splits;
} catch (Exception e) {
sourceExactlyMetric.incNumSnapshotError();
throw e;
}
return splits;
}

@Override
Expand Down Expand Up @@ -188,5 +212,16 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
pulsarTableDeserializationSchema.flushAudit();
pulsarTableDeserializationSchema.updateLastCheckpointId(checkpointId);
}
// get the start time of the currently completed checkpoint
if (checkpointStartTimeMap != null) {
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null && sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotComplete();
sourceExactlyMetric
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);
}
} else {
LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,35 +86,51 @@ public void open(DeserializationSchema.InitializationContext context, SourceConf
keyDeserialization.open(context);
}
if (metricOption != null) {
sourceExactlyMetric = new SourceExactlyMetric(metricOption);
sourceExactlyMetric = new SourceExactlyMetric(metricOption, context.getMetricGroup());
}
valueDeserialization.open(context);
}

@Override
public void deserialize(Message<byte[]> message, Collector<RowData> collector)
throws IOException {
// Get the key row data
List<RowData> keyRowData = new ArrayList<>();
if (keyDeserialization != null) {
keyDeserialization.deserialize(message.getKeyBytes(), new ListCollector<>(keyRowData));
try {
long deserializeStartTime = System.currentTimeMillis();
// increase the number of deserialize success first, if deserialize failed, decrease it
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeSuccess();
}
// Get the key row data
List<RowData> keyRowData = new ArrayList<>();
if (keyDeserialization != null) {
keyDeserialization.deserialize(message.getKeyBytes(), new ListCollector<>(keyRowData));
}

// Get the value row data
List<RowData> valueRowData = new ArrayList<>();

if (upsertMode && message.getData().length == 0) {
rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, collector);
return;
}

MetricsCollector<RowData> metricsCollector =
new MetricsCollector<>(collector, sourceExactlyMetric);

valueDeserialization.deserialize(message.getData(), new ListCollector<>(valueRowData));

rowDataConverter.projectToProducedRowAndCollect(
message, keyRowData, valueRowData, metricsCollector);
if (sourceExactlyMetric != null) {
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime);
}
} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeError();
sourceExactlyMetric.decNumDeserializeSuccess();
}
throw e;
}

// Get the value row data
List<RowData> valueRowData = new ArrayList<>();

if (upsertMode && message.getData().length == 0) {
rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, collector);
return;
}

MetricsCollector<RowData> metricsCollector =
new MetricsCollector<>(collector, sourceExactlyMetric);

valueDeserialization.deserialize(message.getData(), new ListCollector<>(valueRowData));

rowDataConverter.projectToProducedRowAndCollect(
message, keyRowData, valueRowData, metricsCollector);
}

@Override
Expand All @@ -139,4 +155,9 @@ public void updateLastCheckpointId(long checkpointId) {
sourceExactlyMetric.updateLastCheckpointId(checkpointId);
}
}

/** getter for PulsarSourceReader to record metrics */
public SourceExactlyMetric getSourceExactlyMetric() {
return sourceExactlyMetric;
}
}

0 comments on commit d1bd3ab

Please sign in to comment.