Skip to content

Commit

Permalink
[INLONG-11493][SDK] Inlong SDK Dirty Sink supports retry sending (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng authored Nov 14, 2024
1 parent ed165b7 commit f493862
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.sdk.dirtydata;

import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.text.StringEscapeUtils;

Expand All @@ -34,6 +35,9 @@ public class DirtyMessageWrapper {

private static DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private String delimiter;
@Builder.Default
@Getter
private int retryTimes = 0;

private String inlongGroupId;
private String inlongStreamId;
Expand Down Expand Up @@ -71,4 +75,8 @@ public String format() {
.add(StringEscapeUtils.escapeXSI(formatData))
.toString();
}

public void increaseRetry() {
retryTimes++;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;

import com.google.common.base.Preconditions;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.net.InetAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

@Slf4j
@Builder
Expand All @@ -40,9 +44,14 @@ public class InlongSdkDirtySender {
private String authId;
private String authKey;
private boolean ignoreErrors;
private int maxRetryTimes;
private int maxCallbackSize;
@Builder.Default
private boolean closed = false;

private SendMessageCallback callback;
private LinkedBlockingQueue<DirtyMessageWrapper> dirtyDataQueue;
private DefaultMessageSender sender;
private Executor executor;

public void init() throws Exception {
Preconditions.checkNotNull(inlongGroupId, "inlongGroupId cannot be null");
Expand All @@ -51,45 +60,79 @@ public void init() throws Exception {
Preconditions.checkNotNull(authId, "authId cannot be null");
Preconditions.checkNotNull(authKey, "authKey cannot be null");

this.callback = new LogCallBack();
ProxyClientConfig proxyClientConfig =
new ProxyClientConfig(InetAddress.getLocalHost().getHostAddress(), true,
inlongManagerAddr, inlongManagerPort, inlongGroupId, authId, authKey);
proxyClientConfig.setReadProxyIPFromLocal(false);
proxyClientConfig.setAsyncCallbackSize(maxCallbackSize);
this.sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
this.sender.setMsgtype(7);

this.dirtyDataQueue = new LinkedBlockingQueue<>(maxCallbackSize);
this.executor = Executors.newSingleThreadExecutor();
executor.execute(this::doSendDirtyMessage);
log.info("init InlongSdkDirtySink successfully, target group={}, stream={}", inlongGroupId, inlongStreamId);
}

public void sendDirtyMessage(DirtyMessageWrapper messageWrapper)
throws ProxysdkException {
sender.asyncSendMessage(inlongGroupId, inlongStreamId, messageWrapper.format().getBytes(), callback);
public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) throws InterruptedException {
dirtyDataQueue.offer(messageWrapper, 10, TimeUnit.SECONDS);
}

private void doSendDirtyMessage() {
while (!closed) {
try {
DirtyMessageWrapper messageWrapper = dirtyDataQueue.poll();
if (messageWrapper == null) {
Thread.sleep(100L);
continue;
}
messageWrapper.increaseRetry();
if (messageWrapper.getRetryTimes() > maxRetryTimes) {
log.error("failed to send dirty message after {} times, dirty data ={}", maxRetryTimes,
messageWrapper);
continue;
}

sender.asyncSendMessage(inlongGroupId, inlongStreamId,
messageWrapper.format().getBytes(), new LogCallBack(messageWrapper));

} catch (Throwable t) {
log.error("failed to send inlong dirty message", t);
if (!ignoreErrors) {
throw new RuntimeException("writing dirty message to inlong sdk failed", t);
}
}

}
}

public void close() {
closed = true;
dirtyDataQueue.clear();
if (sender != null) {
sender.close();
}
}

@Getter
class LogCallBack implements SendMessageCallback {

private final DirtyMessageWrapper wrapper;

public LogCallBack(DirtyMessageWrapper wrapper) {
this.wrapper = wrapper;
}

@Override
public void onMessageAck(SendResult result) {
if (result == SendResult.OK) {
return;
}
log.error("failed to send inlong dirty message, response={}", result);

if (!ignoreErrors) {
throw new RuntimeException("writing dirty message to inlong sdk failed, response=" + result);
if (SendResult.OK != result) {
dirtyDataQueue.offer(wrapper);
}
}

@Override
public void onException(Throwable e) {
log.error("failed to send inlong dirty message", e);

if (!ignoreErrors) {
throw new RuntimeException("writing dirty message to inlong sdk failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.base.dirty;

import org.apache.inlong.sort.base.dirty.sink.DirtyServerType;
import org.apache.inlong.sort.base.util.PatternReplaceUtils;

import org.apache.flink.table.types.logical.LogicalType;
Expand Down Expand Up @@ -63,6 +64,8 @@ public class DirtyData<T> {
* Dirty type
*/
private final DirtyType dirtyType;

private final DirtyServerType serverType;
/**
* Dirty describe message, it is the cause of dirty data
*/
Expand All @@ -85,10 +88,11 @@ public class DirtyData<T> {
private final T data;

public DirtyData(T data, String identifier, String labels,
String logTag, DirtyType dirtyType, String dirtyMessage,
String logTag, DirtyType dirtyType, DirtyServerType serverType, String dirtyMessage,
@Nullable LogicalType rowType, long dataTime, String extParams) {
this.data = data;
this.dirtyType = dirtyType;
this.serverType = serverType;
this.dirtyMessage = dirtyMessage;
this.rowType = rowType;
Map<String, String> paramMap = genParamMap();
Expand Down Expand Up @@ -127,6 +131,10 @@ public DirtyType getDirtyType() {
return dirtyType;
}

public DirtyServerType getServerType() {
return serverType;
}

public String getIdentifier() {
return identifier;
}
Expand Down Expand Up @@ -154,6 +162,7 @@ public static class Builder<T> {
private String labels;
private String logTag;
private DirtyType dirtyType = DirtyType.UNDEFINED;
private DirtyServerType serverType = DirtyServerType.UNDEFINED;
private String dirtyMessage;
private LogicalType rowType;
private long dataTime;
Expand All @@ -175,6 +184,11 @@ public Builder<T> setDirtyType(DirtyType dirtyType) {
return this;
}

public Builder<T> setServerType(DirtyServerType serverType) {
this.serverType = serverType;
return this;
}

public Builder<T> setLabels(String labels) {
this.labels = labels;
return this;
Expand Down Expand Up @@ -206,7 +220,7 @@ public Builder<T> setRowType(LogicalType rowType) {
}

public DirtyData<T> build() {
return new DirtyData<>(data, identifier, labels, logTag, dirtyType,
return new DirtyData<>(data, identifier, labels, logTag, dirtyType, serverType,
dirtyMessage, rowType, dataTime, extParams);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.base.dirty.sink;

public enum DirtyServerType {

UNDEFINED("Undefined"),
TUBE_MQ("TubeMQ"),
ICEBERG("Iceberg")

;

private final String format;

DirtyServerType(String format) {
this.format = format;
}

public String format() {
return format;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,6 @@ public class InlongSdkDirtyOptions implements Serializable {
private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER;
private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER;
private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER;
private int retryTimes;
private int maxCallbackSize;
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public void invoke(DirtyData<T> dirtyData) throws Exception {
Map<String, String> labelMap = LabelUtils.parseLabels(dirtyData.getLabels());
String dataGroupId = Preconditions.checkNotNull(labelMap.get("groupId"));
String dataStreamId = Preconditions.checkNotNull(labelMap.get("streamId"));
String serverType = Preconditions.checkNotNull(labelMap.get("serverType"));
String dataflowId = Preconditions.checkNotNull(labelMap.get("dataflowId"));

String dirtyMessage = formatData(dirtyData, labelMap);
Expand All @@ -68,7 +67,7 @@ public void invoke(DirtyData<T> dirtyData) throws Exception {
.inlongStreamId(dataStreamId)
.dataflowId(dataflowId)
.dataTime(dirtyData.getDataTime())
.serverType(serverType)
.serverType(dirtyData.getServerType().format())
.dirtyType(dirtyData.getDirtyType().format())
.dirtyMessage(dirtyData.getDirtyMessage())
.ext(dirtyData.getExtParams())
Expand Down Expand Up @@ -99,6 +98,8 @@ public void open(Configuration configuration) throws Exception {
.ignoreErrors(options.isIgnoreSideOutputErrors())
.inlongGroupId(options.getSendToGroupId())
.inlongStreamId(options.getSendToStreamId())
.maxRetryTimes(options.getRetryTimes())
.maxCallbackSize(options.getMaxCallbackSize())
.build();
dirtySender.init();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_RETRIES;

@Slf4j
public class InlongSdkDirtySinkFactory implements DirtySinkFactory {
Expand Down Expand Up @@ -77,6 +78,12 @@ public class InlongSdkDirtySinkFactory implements DirtySinkFactory {
.noDefaultValue()
.withDescription("The inlong stream id of dirty sink");

private static final ConfigOption<Integer> DIRTY_SIDE_OUTPUT_MAX_CALLBACK_SIZE =
ConfigOptions.key("dirty.side-output.inlong-sdk.max-callback-size")
.intType()
.defaultValue(100000)
.withDescription("The inlong stream id of dirty sink");

@Override
public <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context context) {
ReadableConfig config = Configuration.fromMap(context.getCatalogTable().getOptions());
Expand All @@ -95,8 +102,10 @@ private InlongSdkDirtyOptions getOptions(ReadableConfig config) {
.csvFieldDelimiter(config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER))
.inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
.inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
.ignoreSideOutputErrors(config.getOptional(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS).orElse(true))
.enableDirtyLog(true)
.ignoreSideOutputErrors(config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS))
.retryTimes(config.get(DIRTY_SIDE_OUTPUT_RETRIES))
.maxCallbackSize(config.get(DIRTY_SIDE_OUTPUT_MAX_CALLBACK_SIZE))
.enableDirtyLog(config.get(DIRTY_SIDE_OUTPUT_LOG_ENABLE))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.inlong.sort.base.dirty.DirtyData;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.dirty.sink.DirtyServerType;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
Expand Down Expand Up @@ -143,6 +144,7 @@ public void deserialize(Message message, Collector<RowData> out) throws IOExcept

builder.setData(message.getData())
.setDirtyType(DirtyType.KEY_DESERIALIZE_ERROR)
.setServerType(DirtyServerType.TUBE_MQ)
.setDirtyDataTime(dataTime)
.setExtParams(message.getAttribute())
.setLabels(dirtyOptions.getLabels())
Expand Down

0 comments on commit f493862

Please sign in to comment.