Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterZh6 committed Oct 10, 2024
1 parent c37bf2a commit 00d2757
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.inlong.sort.pulsar.source.reader;

import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.ReaderOutput;
Expand All @@ -34,6 +32,8 @@
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
import org.apache.flink.core.io.InputStatus;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -70,6 +70,10 @@ public class PulsarOrderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT>
private final AtomicReference<Throwable> cursorCommitThrowable = new AtomicReference<>();
private final PulsarDeserializationSchema<OUT> deserializationSchema;
private ScheduledExecutorService cursorScheduler;
private SourceExactlyMetric sourceExactlyMetric;

/** The map to store the start time of each checkpoint. */
private transient Map<Long, Long> checkpointStartTimeMap;

public PulsarOrderedSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> elementsQueue,
Expand All @@ -90,6 +94,10 @@ public PulsarOrderedSourceReader(
this.cursorsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
this.cursorsOfFinishedSplits = new ConcurrentHashMap<>();
this.deserializationSchema = deserializationSchema;
// get SourceExactlyMetric instance from deserializationSchema
if(deserializationSchema instanceof PulsarTableDeserializationSchema) {
this.sourceExactlyMetric = ((PulsarTableDeserializationSchema)deserializationSchema).getSourceExactlyMetric();
}
}

@Override
Expand Down Expand Up @@ -131,25 +139,41 @@ protected void onSplitFinished(Map<String, PulsarPartitionSplitState> finishedSp

@Override
public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
if (deserializationSchema instanceof PulsarTableDeserializationSchema) {
((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId);
}
List<PulsarPartitionSplit> splits = super.snapshotState(checkpointId);
try {
// record the start time of each checkpoint
if (checkpointStartTimeMap != null) {
checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis());
}
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotCreate();
}

// Perform a snapshot for these splits.
Map<TopicPartition, MessageId> cursors =
cursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());
// Put the cursors of the active splits.
for (PulsarPartitionSplit split : splits) {
MessageId latestConsumedId = split.getLatestConsumedId();
if (latestConsumedId != null) {
cursors.put(split.getPartition(), latestConsumedId);
if (deserializationSchema instanceof PulsarTableDeserializationSchema) {
((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId);
}
List<PulsarPartitionSplit> splits = super.snapshotState(checkpointId);

// Perform a snapshot for these splits.
Map<TopicPartition, MessageId> cursors =
cursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());
// Put the cursors of the active splits.
for (PulsarPartitionSplit split : splits) {
MessageId latestConsumedId = split.getLatestConsumedId();
if (latestConsumedId != null) {
cursors.put(split.getPartition(), latestConsumedId);
}
}
// Put cursors of all the finished splits.
cursors.putAll(cursorsOfFinishedSplits);

return splits;
} catch(Exception e) {
if(sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotError();
throw e;
}
}
// Put cursors of all the finished splits.
cursors.putAll(cursorsOfFinishedSplits);

return splits;
return null;
}

@Override
Expand All @@ -170,6 +194,17 @@ public void notifyCheckpointComplete(long checkpointId) {
pulsarTableDeserializationSchema.flushAudit();
pulsarTableDeserializationSchema.updateLastCheckpointId(checkpointId);
}
// get the start time of the currently completed checkpoint
if (checkpointStartTimeMap != null) {
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null && sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotComplete();
sourceExactlyMetric
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);
}
} else {
LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint");
}
} catch (Exception e) {
LOG.error("Failed to acknowledge cursors for checkpoint {}", checkpointId, e);
cursorCommitThrowable.compareAndSet(null, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@

package org.apache.inlong.sort.pulsar.table;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;

import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.pulsar.client.api.Message;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -94,27 +92,39 @@ public void open(DeserializationSchema.InitializationContext context, SourceConf
@Override
public void deserialize(Message<byte[]> message, Collector<RowData> collector)
throws IOException {
// Get the key row data
List<RowData> keyRowData = new ArrayList<>();
if (keyDeserialization != null) {
keyDeserialization.deserialize(message.getKeyBytes(), new ListCollector<>(keyRowData));
try {
long deserializeStartTime = System.currentTimeMillis();
// Get the key row data
List<RowData> keyRowData = new ArrayList<>();
if (keyDeserialization != null) {
keyDeserialization.deserialize(message.getKeyBytes(), new ListCollector<>(keyRowData));
}

// Get the value row data
List<RowData> valueRowData = new ArrayList<>();

if (upsertMode && message.getData().length == 0) {
rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, collector);
return;
}

MetricsCollector<RowData> metricsCollector =
new MetricsCollector<>(collector, sourceExactlyMetric);

valueDeserialization.deserialize(message.getData(), new ListCollector<>(valueRowData));

rowDataConverter.projectToProducedRowAndCollect(
message, keyRowData, valueRowData, metricsCollector);
if(sourceExactlyMetric != null) {
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime);
sourceExactlyMetric.incNumDeserializeSuccess();
}
} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeError();
}
throw e;
}

// Get the value row data
List<RowData> valueRowData = new ArrayList<>();

if (upsertMode && message.getData().length == 0) {
rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, collector);
return;
}

MetricsCollector<RowData> metricsCollector =
new MetricsCollector<>(collector, sourceExactlyMetric);

valueDeserialization.deserialize(message.getData(), new ListCollector<>(valueRowData));

rowDataConverter.projectToProducedRowAndCollect(
message, keyRowData, valueRowData, metricsCollector);
}

@Override
Expand All @@ -139,4 +149,9 @@ public void updateLastCheckpointId(long checkpointId) {
sourceExactlyMetric.updateLastCheckpointId(checkpointId);
}
}

/** getter for PulsarSourceReader to record metrics */
public SourceExactlyMetric getSourceExactlyMetric() {
return sourceExactlyMetric;
}
}

0 comments on commit 00d2757

Please sign in to comment.