Skip to content

Commit

Permalink
style
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterZh6 committed Oct 10, 2024
1 parent 00d2757 commit ec1df10
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.inlong.sort.pulsar.source.reader;

import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.ReaderOutput;
Expand All @@ -32,8 +35,6 @@
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
import org.apache.flink.core.io.InputStatus;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -95,8 +96,9 @@ public PulsarOrderedSourceReader(
this.cursorsOfFinishedSplits = new ConcurrentHashMap<>();
this.deserializationSchema = deserializationSchema;
// get SourceExactlyMetric instance from deserializationSchema
if(deserializationSchema instanceof PulsarTableDeserializationSchema) {
this.sourceExactlyMetric = ((PulsarTableDeserializationSchema)deserializationSchema).getSourceExactlyMetric();
if (deserializationSchema instanceof PulsarTableDeserializationSchema) {
this.sourceExactlyMetric =
((PulsarTableDeserializationSchema) deserializationSchema).getSourceExactlyMetric();
}
}

Expand Down Expand Up @@ -167,8 +169,8 @@ public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
cursors.putAll(cursorsOfFinishedSplits);

return splits;
} catch(Exception e) {
if(sourceExactlyMetric != null) {
} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotError();
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@

package org.apache.inlong.sort.pulsar.table;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;

import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.pulsar.client.api.Message;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -115,7 +117,7 @@ public void deserialize(Message<byte[]> message, Collector<RowData> collector)

rowDataConverter.projectToProducedRowAndCollect(
message, keyRowData, valueRowData, metricsCollector);
if(sourceExactlyMetric != null) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime);
sourceExactlyMetric.incNumDeserializeSuccess();
}
Expand Down

0 comments on commit ec1df10

Please sign in to comment.