diff --git a/docs/development/binary_protocol/img/tubemq_frame.png b/docs/development/binary_protocol/img/tubemq_frame.png new file mode 100644 index 00000000000..28a388c3693 Binary files /dev/null and b/docs/development/binary_protocol/img/tubemq_frame.png differ diff --git a/docs/development/binary_protocol/img/tubemq_rpc.png b/docs/development/binary_protocol/img/tubemq_rpc.png new file mode 100644 index 00000000000..a76bd86951f Binary files /dev/null and b/docs/development/binary_protocol/img/tubemq_rpc.png differ diff --git a/docs/development/binary_protocol/img/tubemq_rpc_consumer.png b/docs/development/binary_protocol/img/tubemq_rpc_consumer.png new file mode 100644 index 00000000000..16eb8c61a5b Binary files /dev/null and b/docs/development/binary_protocol/img/tubemq_rpc_consumer.png differ diff --git a/docs/development/binary_protocol/img/tubemq_rpc_producer.png b/docs/development/binary_protocol/img/tubemq_rpc_producer.png new file mode 100644 index 00000000000..66495b9aa25 Binary files /dev/null and b/docs/development/binary_protocol/img/tubemq_rpc_producer.png differ diff --git a/docs/development/binary_protocol/tubemq_binary.md b/docs/development/binary_protocol/tubemq_binary.md new file mode 100644 index 00000000000..c8ae862a6dd --- /dev/null +++ b/docs/development/binary_protocol/tubemq_binary.md @@ -0,0 +1,466 @@ +--- +title: TubeMQ binary protocol +sidebar_position: 4 +--- + + +## Overview + +The various nodes (Client, Master, Broker) of the InLong TubeMQ module interact with each other in the form of TCP long connections, and use a custom binary encoding protocol to construct interactive request and response messages. This article mainly introduces the definition of the binary protocol and gives an example of how to complete the entire process of TubeMQ production and consumption interaction through this protocol. + +## TubeMQ message format + +The following figure is a schematic diagram of the TubeMQ message format definition: + +![TubeMQ message frame](img/tubemq_frame.png) + +As shown from the figure above, each interactive message consists of three fixed parts: + +- MsgToken: this field is used to identify the legitimacy of the TubeMQ message. Each TubeMQ message will carry the specified `RPC_PROTOCOL_BEGIN_TOKEN` parameter value. When the client receives a message that does not start with this field, it means that the message is not a legitimate message sent by TubeMQ. The connection can be closed according to the policy, prompting an error exit or reconnection; + +- SerialNo: the message sequence number is generated by the requester and returned by the recipient of the request in the response message as is, so that the recipient of the response can associate the request corresponding to the response message lock; + +- Message content part: this part is encoded by Protobuf and consists of several parts: + + - ListSize: 4 bytes, indicating the total number of data blocks after the data encoded by Protobuf is cut into a certain length. This field is not 0 under the current protocol definition; + + - `[]`: data block, composed of 2 fields, indicating the length of the data block sent and the data content, among which: + + - Length: identifies the length of the data block + + - Data: identifies the binary data content of the data block + +Why is the Protobuf (hereinafter referred to as PB) encoded data content defined in the form of `ListSize []`? + +The main reason is that in the initial implementation of TubeMQ, the serialized PB data is stored in ByteBuffer objects. The maximum block length of a single ByteBuffer in Java is 8196 bytes. PB message content exceeding the length of a single block is stored in multiple ByteBuffers; and when the data is serialized to the TCP message, the total length is not counted, and the ByteBuffer list serialized in PB is directly written into the message. +**When implementing in multiple languages, this needs special attention:** the PB data content needs to be serialized into a block array (there is corresponding support in the PB codec). + +The PB codec file of the message content is stored in the `org.apache.inlong.tubemq.corerpc` module. For detailed format definitions, refer to the relevant files. + +### PB format encoding + +The PB protocol is divided into three parts: + +- RPC framework definition: `RPC.proto` + +- Master-related message encoding: `MasterService.proto` + +- Broker-related message encoding: `BrokerService.proto` + +These protocol definition files are directly compiled through PB to obtain the corresponding implementation class. Taking RPC.proto as an example, RPC.proto defines 6 structures, which are divided into 2 types: + +- Request message + +- Response message, including normal response return and response return in case of exception + +The request message encoding and response message decoding can be implemented by referring to the `NettyClient.java` class. There is some room for improvement in the definition of this part, see [TUBEMQ-109](https://issues.apache.org/jira/browse/TUBEMQ-109) for details. However, due to compatibility considerations, it will be gradually replaced. According to the current proto version, interaction is not a problem at least before version 1.0.0, but the new protocol will be considered for 1.0.0. The protocol implementation module requires each SDK to reserve room for improvement. + +Taking the request message filling as an example, the RpcConnHeader and other related structures are as follows: + +```protobuf +message RpcConnHeader { + required int32 flag = 1; + optional int64 traceId = 2; + optional int64 spanId = 3; + optional int64 parentId = 4; +} + +message RequestHeader { + optional int32 serviceType = 1; + optional int32 protocolVer = 2; +} + +message RequestBody { + required int32 method = 1; + optional int64 timeout = 2; + optional bytes request = 3; +} +``` +Among them: + +- `RpcConnHeader`'s `flag` marks whether the message is requested, and the following three fields mark the relevant content of message tracking, which is not used at present; + +- `RequestHeader` contains information about service type and protocol version; + +- `RequestBody` contains request method, timeout, and request content, among which `timeout` is the maximum allowed waiting time from when a request is received by the server to when it is actually processed. If it exceeds, it will be discarded. The current default is 10 seconds. + +The specific implementation of request filling is shown in the following part: + +```java +RequestWrapper requestWrapper = + new RequestWrapper(PbEnDecoder.getServiceIdByServiceName(targetInterface), + RpcProtocol.RPC_PROTOCOL_VERSION, + RpcConstants.RPC_FLAG_MSG_TYPE_REQUEST, + requestTimeout); // request timeout +``` + +At this point, the introduction to the protocol format definition of TubeMQ is complete. Next, we will complete data production and consumption with messages composed of these protocol formats. + +## Client request response interaction + +### Producer production interaction diagram + +The Producer uses a total of 4 pairs of instructions: registering with the Master node, maintaining heartbeats, and exiting registration operations; interacting with the Broker node to report messages: + +![Producer RPC interaction](img/tubemq_rpc_producer.png) + +From here we can see that the Producer obtains metadata information such as the partition list corresponding to the specified Topic from the Master. After obtaining this information, it selects the partition according to the client's rules and sends the message to the corresponding Broker. + +Producer needs to pay attention to **multi-language implementation:** + +- The Master has active and standby nodes, and only the active node can provide services. When the Producer connects to the standby node, it will receive a `StandbyException` exception response. At this time, the client needs to select other Master nodes for registration, and finally select the active Master node for registration; + +- When the Master connection fails during the production process, such as timeout, passive disconnection of the link, etc., the Producer must initiate a re-registration request to the Master; + +- After receiving the metadata information of the Topic from the Master, the Producer must pre-connect to the Broker in advance to avoid a sudden increase in connection requests during data production that affects the message reporting performance; + +- The connection between the Producer and the Broker must be detected for anomalies: Broker failure nodes must be detected in long-term running scenarios, and links that have not sent messages for a long time must be recycled to improve the stability of the data reporting service. + +### Consumer consumption interaction diagram + +Consumer uses a total of 8 pairs of instructions: registering with the Master, heartbeat, and deregistering; registering with the Broker, deregistering, heartbeat, pulling messages, and confirming messages; the registration and deregistration with the Broker use the same command name with different status codes to identify and distinguish different operations: + +![Consumer RPC interaction](img/tubemq_rpc_consumer.png) + +From the example in the figure above, we can see that: + +- When the Consumer registers with the main Master node, the Master does not return metadata information to the Consumer, but returns it in the subsequent heartbeat link. The reason is that the Consumer in the example uses the server-side load balancing mode, and needs to wait for the server to distribute the consumption partition information before obtaining the corresponding consumption partition; + +- There are registration and un-registration operations from Consumer to Broker. The reason is that the partition is exclusive consumption during consumption, that is, the same partition can only be consumed by one consumer in the same group at the same time. The client obtains the consumption rights of the partition through the registration operation; + +- Consumer message pulling and consumption confirmation need to appear in pairs. Through the secondary confirmation of data consumption, the problem of repeated consumption can be minimized as much as possible, and the problem of data being missed in abnormal situations can be solved. + +### The RPC interaction process between the client and the server + +As shown below: + +![TubeMQ RPC Implementation](img/tubemq_rpc.png) + +- When the client interacts with the TubeMQ server, it must maintain local storage of the sent request message until the RPC times out or a response message is received; + +- The client associates the SerialNo value carried in the response message with the previously cached sent request record; + +- After receiving the Broker and Topic metadata information from the Master, the client must save it locally and update it with the latest metadata, and report the cached metadata to the Master regularly; + +- The client must maintain the heartbeat of the Master or Broker. If the Master reports a registration timeout error, it must re-register; + +- The client must establish a connection based on the Broker, and the business is allowed to choose to establish a connection by object or by process between different objects in the same process. + + +#### Producer registers with the Master + +---------- + +```protobuf +message RegisterRequestP2M { + required string clientId = 1; + repeated string topicList = 2; + required int64 brokerCheckSum = 3; + required string hostName = 4; + optional MasterCertificateInfo authInfo = 5; + optional string jdkVersion = 6; + optional ApprovedClientConfig appdConfig = 7; +} + +message RegisterResponseM2P { + required bool success = 1; + required int32 errCode = 2; + required string errMsg = 3; + required int64 brokerCheckSum = 4; + repeated string brokerInfos = 5; + optional MasterAuthorizedInfo authorizedInfo = 6; + optional ApprovedClientConfig appdConfig = 7; +} +``` + +- clientId:Identifies the Producer object. The ID value is constructed when the Producer is started and is valid during the Producer life cycle. The current construction rules of the Java version of the SDK are: + + ```java + ClientId = consumerGroup + "_" + + AddressUtils.getLocalAddress() + "_" // local ip (IPV4) + + pid + "_" // processId + + timestamp + "_" // timestamp + + counter + "_" // increament counter + + consumerType + "_" // type of consumer,including Pull and Push + + clientVersion; // version for client + ``` + It is recommended that other languages add the above mark to facilitate troubleshooting; + +- topicList: Identifies the topic list published by the user. The Producer will provide the initial topic list of the data to be published during initialization. During operation, the business is also allowed to delay adding new topics and reducing published topics through the Publish function; + +- brokerCheckSum: The check value of the Broker metadata information saved locally by the client. The Producer does not have this data locally during initial startup, so the value is -1; the SDK needs to carry the last brokerCheckSum value in each request, and the Master determines whether the client's metadata needs to be updated by comparing this value; + +- hostname: The IPV4 address value of the machine where the Producer is located; + +- success: Whether the operation is successful, success is true, and failure is false; + +- errCode: Error code, combined with errMsg information to determine the specific cause of the error; + +- errMsg: Error message, if the request response fails, the SDK needs to print out the specific error message + +- authInfo: authentication and authorization information. If the user configuration has filled in the "Start authentication process", fill it in; if authentication is required, report it according to the signature of the username and password. If it is running, such as during heartbeat, if the Master forces authentication, report it according to the signature of the username and password. If not, authenticate it according to the authorization token provided by the Master during the previous interaction; the authorization token is also used to carry the message production to the Broker during production. + +- brokerInfos: Broker metadata information. This field mainly contains the Broker information list of the entire cluster fed back by the Master to the Producer; its format is as follows: + + ```java + public BrokerInfo(String strBrokerInfo, int brokerPort) { + String[] strBrokers = + strBrokerInfo.split(TokenConstants.ATTR_SEP); + this.brokerId = Integer.parseInt(strBrokers[0]); + this.host = strBrokers[1]; + this.port = brokerPort; + if (!TStringUtils.isBlank(strBrokers[2])) { + this.port = Integer.parseInt(strBrokers[2]); + } + this.buildStrInfo(); + } + ``` + +- authorizedInfo:Master provides authorization information in the following format: + + ```protobuf + message MasterAuthorizedInfo { + required int64 visitAuthorizedToken = 1; + optional string authAuthorizedToken = 2; + } + ``` +- visitAuthorizedToken: Access authorization token, to prevent the client from bypassing the Master to access the Broker node. The SDK needs to save this information locally and carry this information when accessing the Broker in the future. If this field changes in the subsequent heartbeat, the locally cached data of this field needs to be updated; + +- authAuthorizedToken: Authorization token that has passed authentication. If there is data in this field, the SDK needs to save it and carry this field information when accessing the Master and Broker in the future. If this field changes in the subsequent heartbeat, the locally cached data of this field needs to be updated. + + +#### Producer to Master Heartbeat + +---------- + +```protobuf +message HeartRequestP2M { + required string clientId = 1; + required int64 brokerCheckSum = 2; + required string hostName = 3; + repeated string topicList = 4; + optional MasterCertificateInfo authInfo = 5; + optional ApprovedClientConfig appdConfig = 6; +} + +message HeartResponseM2P { + required bool success = 1; + required int32 errCode = 2; + required string errMsg = 3; + required int64 brokerCheckSum = 4; + /* brokerId:host:port-topic:partitionNum */ + repeated string topicInfos = 5; + repeated string brokerInfos = 6; + optional bool requireAuth = 7; + optional MasterAuthorizedInfo authorizedInfo = 8; + optional ApprovedClientConfig appdConfig = 9; +} +``` + +- topicInfos: Topic metadata information published by the SDK, including partition information and the Broker node where it is located. The specific decoding method is as follows: + + ```java + public static Tuple2, List> convertTopicInfo( + Map brokerInfoMap, List strTopicInfos) { + List topicList = new ArrayList<>(); + Map topicMaxSizeInBMap = new ConcurrentHashMap<>(); + if (strTopicInfos == null || strTopicInfos.isEmpty()) { + return new Tuple2<>(topicMaxSizeInBMap, topicList); + } + String[] strInfo; + String[] strTopicInfoSet; + String[] strTopicInfo; + BrokerInfo brokerInfo; + for (String info : strTopicInfos) { + if (info == null || info.isEmpty()) { + continue; + } + info = info.trim(); + strInfo = info.split(TokenConstants.SEGMENT_SEP, -1); + strTopicInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP); + for (String s : strTopicInfoSet) { + strTopicInfo = s.split(TokenConstants.ATTR_SEP); + brokerInfo = brokerInfoMap.get(Integer.parseInt(strTopicInfo[0])); + if (brokerInfo != null) { + topicList.add(new TopicInfo(brokerInfo, + strInfo[0], Integer.parseInt(strTopicInfo[1]), + Integer.parseInt(strTopicInfo[2]), true, true)); + } + } + if (strInfo.length == 2 || TStringUtils.isEmpty(strInfo[2])) { + continue; + } + try { + topicMaxSizeInBMap.put(strInfo[0], Integer.parseInt(strInfo[2])); + } catch (Throwable e) { + // + } + } + return new Tuple2<>(topicMaxSizeInBMap, topicList); + } + ``` + +- requireAuth: indicates that the previous authorized access code (authAuthorizedToken) of the Master has expired, requiring the SDK to carry the signature information of the username and password for authentication in the next request; + +#### Producer to Master Close and Exit + +---------- + +```protobuf +message CloseRequestP2M{ + required string clientId = 1; + optional MasterCertificateInfo authInfo = 2; +} + +message CloseResponseM2P{ + required bool success = 1; + required int32 errCode = 2; + required string errMsg = 3; +} +``` +Noted that if authentication is turned on, authentication will be done when it is turned off to avoid external interference. + +#### Producer sends messages to Broker + +---------- + +The content of this section is mainly related to the definition of Message: + +```protobuf +message SendMessageRequestP2B { + required string clientId = 1; + required string topicName = 2; + required int32 partitionId = 3; + required bytes data = 4; + required int32 flag = 5; + required int32 checkSum = 6; + required int32 sentAddr = 7; + optional string msgType = 8; + optional string msgTime = 9; + optional AuthorizedInfo authInfo = 10; +} + +message SendMessageResponseB2P { + required bool success = 1; + required int32 errCode = 2; + required string errMsg = 3; + optional bool requireAuth = 4; + optional int64 messageId = 5; + optional int64 appendTime = 6; + optional int64 appendOffset = 7; +} +``` +- data: Binary byte stream information of Message, implemented as follows: + + ```Java + private byte[] encodePayload(final Message message) { + final byte[] payload = message.getData(); + final String attribute = message.getAttribute(); + if (TStringUtils.isBlank(attribute)) { + return payload; + } + byte[] attrData = StringUtils.getBytesUtf8(attribute); + final ByteBuffer buffer = + ByteBuffer.allocate(4 + attrData.length + payload.length); + buffer.putInt(attrData.length); + buffer.put(attrData); + buffer.put(payload); + return buffer.array(); + } + ``` + +- sentAddr: IPv4 of the local machine where the SDK is located. Here, the IP address is converted into a 32-bit digital ID; + +- msgType: The stream value to which the message belongs, used for filtering consumption; + +- msgTime The time when the SDK sends a message. Its value comes from the value filled in by putSystemHeader when constructing the Message; + +- requireAuth: Whether authentication identification is required for data production to the Broker. Considering performance issues, it is not effective at present. The authAuthorizedToken value filled in the sent message is based on the value provided by the Master side and changes with the Master side. + + +#### Partition Loadbalance + +---------- + +The InLong TubeMQ module currently supports two balancing modes: server-side load balancing and client-side balancing. The business can choose different balancing methods according to needs. + +The server balancing process is managed and maintained by the server, and the requirements for the Consumer consumption side are relatively low. The load balancing process is as follows: + +1. After the Master process is started, the load balancing thread balancerChore is started. BalancerChore periodically checks the currently registered consumer groups and performs load balancing. In simple terms, the process is to evenly distribute the partitions subscribed by the consumer group to the registered clients, and regularly check whether the current number of partitions of the client exceeds the predetermined number. If it exceeds, the excess partitions are split to other clients with a smaller number. + +2. The Master checks whether the current consumer group needs to be load balanced. If necessary, all partitions of the Topic set subscribed by the consumer group and all consumer IDs of this consumer group are sorted, and then the number of partitions and the number of clients of the consumer group are divided and modulo to obtain the maximum number of partitions subscribed by each client; then partitions are allocated to each client, and the partition information is carried in the heartbeat response when the consumer subscribes; if the client currently has more partitions, a partition release instruction is given to the client to release the partition from the consumer, and a partition allocation instruction is given to the allocated consumer to inform the partition that the corresponding client is allocated. The specific instructions are as follows: + + ```protobuf + message EventProto{ + optional int64 rebalanceId = 1; + optional int32 opType = 2; + optional int32 status = 3; + /* consumerId@group-brokerId:host:port-topic:partitionId */ + repeated string subscribeInfo = 4; + } + ``` + Among them: + + - rebalanceId: self-incrementing long value ID, indicating the round of load balancing; + + - subscribeInfo: indicates the assigned partition information; + + - opType: operation code, the value is defined in EventType, and the currently implemented operation codes only have the following 4 parts: release connection, establish connection; only\_xxx is not expanded at present. After receiving the load balancing information carried in the heartbeat, the Consumer performs corresponding business operations according to this value; + + ```java + switch (event.getType()) { + case DISCONNECT: + case ONLY_DISCONNECT: + disconnectFromBroker(event); + rebalanceResults.put(event); + break; + case CONNECT: + case ONLY_CONNECT: + connect2Broker(event); + rebalanceResults.put(event); + break; + case REPORT: + reportSubscribeInfo(); + break; + case STOPREBALANCE: + break; + default: + throw new TubeClientException(strBuffer + .append("Invalid rebalance opCode:") + .append(event.getType()).toString()); + } + ``` + + - status: indicates the status of the event, defined in `EventStatus`: + + ```java + public enum EventStatus { + /** + * To be processed state. + * */ + TODO(0, "To be processed"), + /** + * On processing state. + * */ + PROCESSING(1, "Being processed"), + /** + * Processed state. + * */ + DONE(2, "Process Done"), + + /** + * Unknown state. + * */ + UNKNOWN(-1, "Unknown event status"), + /** + * Failed state. + * */ + FAILED(-2, "Process failed"); + } + ``` + +3. When the Master constructs the load balancing processing task, it sets the instruction status to TODO; when the client heartbeat request comes, the Master writes the task into the response message and sets the instruction status to PROCESSING; the client receives the load balancing instruction from the heartbeat response, performs the actual connection or disconnection operation, and after the operation is completed, sets the instruction status to DONE, and waits for the next heartbeat request to be sent back to the Master; + +4. Consumer operation: After the Consumer receives the metadata information returned by the Master, it establishes and releases the connection, see the opType annotation above, and after the connection is established, returns the event processing result to the Master, thereby completing the related operations of receiving tasks, executing tasks, and returning task processing results; it should be noted that load balancing registration is a best-effort operation. If the consumer initiates a connection operation, but the consumer that previously occupied the partition has not had time to exit, it will receive `PARTITION_OCCUPIED` The partition is deleted from the attempt queue at this time; the previous partition consumer will still perform the deletion operation after receiving the corresponding response, so that the consumer assigned to this partition in the next round of load balancing is successfully registered on the partition. + +At this point, the consumption balancing operation on the consumer side is completed, and the consumer registers and consumes data after obtaining the partition information. \ No newline at end of file diff --git a/docs/modules/tubemq/client_rpc.md b/docs/modules/tubemq/client_rpc.md deleted file mode 100644 index e7452d29e61..00000000000 --- a/docs/modules/tubemq/client_rpc.md +++ /dev/null @@ -1,445 +0,0 @@ ---- -title: Client RPC ---- - - -## 1 General Introduction - -Implements of this part can be found in `org.apache.tubemq.corerpc`. Each node in Apache TubeMQ Cluster Communicates by TCP Keep-Alive. Mseeages are definded using binary and protobuf combined. -![](img/client_rpc/rpc_bytes_def.png) - -All we can see in TCP are binary streams. We defined: -- msgToken: 4-bytes, `RPC_PROTOCOL_BEGIN_TOKEN` in header, which are used to distinguish each message and identify the legitimacy of the counterpart. When message client received is not started with this header field, client needs to close the connection and prompt the error and quit or reconnect because the protocal is not supported by TubeMQ or something wrong may happended. -- serialNo: 4-bytes, this field is sent by client to server and returned by server exactly the same when after handling the request. It is mainly used to associate the context of the client request and response. -- listSize: 4-bytes, the length of the following PB blocks, this field would not be 0 in current definition. -- `[]`: field is a combination of 2 fields - - len: the length of data - - data: the content of data - -> Why the format of `listSize []` ? -> -> This is because the serialized PB data is saved as a ByteBuffer object in TubeMQ, and in Java, there a maximum(8196) length of ByteBuffer block, an over length PB message needs to be saved in several ByteBuffer. No total length was counted now, and the ByteBuffer is directly written when Serializing in to TCP message. -> -> **Please pay attention when implementing multiple languages and SDKs.** Need to serialize PB data content into arrays of blocks(supported in PB codecs). - - - -## 2 PB format code: - -There mainly has three kinds of PB messages in TubeMQ: -- RPC related messages : `RPC.proto` -- Master related messages : `MasterService.proto` -- Broker related messages : `BrokerService.proto` - -`RPC.proto` defines 6 struct, which divided into 2 class: Request message and Response message. Response message is divided into Successful Response and Exception Response. - -The request message encoding and response message decoding can be implemented in the `NettyClient.java` class. There is some room for improvement in this part of the definition and can be found in [TUBEMQ-109](https://issues.apache.org/jira/browse/TUBEMQ-109). However, due to compatibility concerns, it will be gradually replaced. We have implemented the current protobuf version, which is not a problem until at least 1.0.0. With the new protocol, the protocol implementation module requires each SDK to allow room for improvement. Take request message as an example, `RpcConnHeader` and other related structures are as follows: -```protobuf -message RpcConnHeader { - required int32 flag = 1; - optional int64 traceId = 2; - optional int64 spanId = 3; - optional int64 parentId = 4; -} - -message RequestHeader { - optional int32 serviceType = 1; - optional int32 protocolVer = 2; -} - -message RequestBody { - required int32 method = 1; - optional int64 timeout = 2; - optional bytes request = 3; -} -``` - -Flag marks whether the message is requested or not, and the next three marks represent the content of the message trace, which is not currently used; the related is a fixed mapping of the service type, protocol version, service type, etc., the more critical parameter RequestBody.timeout is the maximum allowable time from when a request is received by the server to when it is actually processed. Long wait time, discarded if exceeded, current default is 10 seconds, request filled as follows. -```java -RequestWrapper requestWrapper = - new RequestWrapper(PbEnDecoder.getServiceIdByServiceName(targetInterface), - RpcProtocol.RPC_PROTOCOL_VERSION, - RpcConstants.RPC_FLAG_MSG_TYPE_REQUEST, - requestTimeout); // request timeout -``` - - -## 3 Interactive diagram of the client's PB request & response: - -**Producer Interaction**: - -The Producer has four pairs of instructions in the system, registration to master, heartbeat to master, exit from master and sending message to brokers. -![](img/client_rpc/rpc_producer_diagram.png) - -Here we can see, Producer's implementation logic is to get metadata such as the list of partitions of specified topic from master, then select a partition and send message via TCP connection according to the rules of the client. It may be unsafe to send message without registration to master, the initial consideration was to use internal intake messages as much as possible and after that, considering security issues, we added authorization information carrying on top of this to perform authentication and authorization checks on the server side, solving the situation where the client bypasses the direct connection to the master and sends messages without authorization. But this will only enable in production environment. - -**Note in producer side of multiple languages implementation:** - -1. Our Master is running as a hot-swap master, and the switchover is based on the information carried by the `RspExceptionBody`. In this case, you need to search for the keywords `"StandbyException"`, If this type of exception occurs, switch to another Master node for re-registration. This part has some relevant issues to adjust to the problem. - -2. Producer should re-register in the event of a Master connection failure during production, e.g. timeout, passive connection break, etc. - -3. Producer side should pay attention to the Broker pre-connection operation in advance: the back-end cluster can have hundreds of Broker nodes, and each Broker has about ten partitions, so there will be thousands of possible records about the partition, after the SDK receives the metadata information from the Master, it should perform the connection establishment operation on the Broker that has not yet built the chain in advance. - -4. The Producer to Broker connection should be aware of anomaly detection and should be able to detect Broker bad spots and long periods of no messages, and to recycle the connection to Broker to avoid unstable operation in long-term running scene. - -**Consumer Interaction Diagram**: - -Consumer has 7 pairs of command in all, Register, Heartbeat, Exit to Master; Register, Logout, Heartbeat, Pulling mseeage to Broker. Registration and Logout to Broker is the same command, indicated by a different status code. - -![](img/client_rpc/rpc_consumer_diagram.png) - -As we can see from the above picture, the Consumer first has to register to the Master, but registering to the Master can not get Metadata information immediately because TubeMQ is using a server-side load-balancing model, and the client needs to wait for the server to dispatch the consumption partition information; Consumer to Broker needs to register the logout operation. Partition is exclusive at the time of consumption, i.e., the same partition can only be consumed by one consumer in the same group at the same time. To solve this problem, the client needs to register and get consumption access to the partition; message pull and consumption confirmation need to appear in pairs. Although the protocol supports multiple pulls and then the last acknowledgement process, it is possible that the consumer permissions of a partition may be lost timeout from the client, thus This causes the data rollback to be triggered by repetitive consumption, and the more data is saved the more repetitive consumption will occur, so follow the 1:1 submission comparison fit. - -## 4 Client feature: - -| **FEATURE** | **Java** | **C/C++** | **Go** | **Python** | **Rust** | **NOTE** | -| --- | --- | --- | --- | --- | --- | --- | -| TLS | ✅ | | | | | | -| Authorization | ✅ | | | | | | -| Anti-bypass-master production/consumption | ✅ | | | | | | -| Distributed system with clients accessing Broker without Master's authentication authorization | ✅ | | | | | | -| Effectively-Once | ✅ | | | | | | -| Partition offset consumption | ✅ | | | | | | -| Multiple Topic Consumption for a single Consumer group | ✅ | | | | | | -| Server Consumption filter | ✅ | | | | | | -| Auto shielding inactive Nodes| ✅ | | | | | | -| Auto shielding bad Brokers | ✅ | | | | | | -| Auto reconnect | ✅ | | | | | | -| Auto recycling of Idle Connection | ✅ | | | | | | -| Inactive for more than a specified period(e.g. 3min, mainly the producer side)| ✅ | | | | | | -| Connection reuse | ✅ | | | | | | -| Connection sharing according to the sessionFactory | ✅ | | | | | | -| Unconnection reuse | ✅ | | | | | | -| Asynchrounous Production | ✅ | | | | | | -| Synchrounous Production | ✅ | | | | | | -| Pull Consumption | ✅ | | | | | | -| Push Consumption | ✅ | | | | | | -| Consumption limit (QOS) | ✅ | | | | | | -| Limit the amount of data per unit of time consumed by consumers | ✅ | | | | | | -| Pull Consumption frequency limit | ✅ | | | | | | -| Consumer Pull Consumption frequency limit | ✅ | | | | | | - - -## 5 Client function Induction CaseByCase: - -**Client side and server side RPC interaction process**: - ----------- - -![](img/client_rpc/rpc_inner_structure.png) - -As shown above, the client has to maintain local preservation of the sent request message until the RPC times out, or a response message is received and the response The message is associated by the SerialNo generated when the request is sent; the Broker information received from the server side, and the Topic information, which the SDK stores locally and updates with the latest returned information, as well as periodic reports to the Server side; the SDK is maintained to the heartbeat of the Master or Broker, and if Master feedback is found When the registration timeout error, re-registration operation should be carried out; SDK should be based on Broker connection establishment, the same process different Between objects, to allow the business to choose whether to support per-object or per-process connections. - -### 5.1 Message: Producer register to Master: - ----------- - -```protobuf -message RegisterRequestP2M { - required string clientId = 1; - repeated string topicList = 2; - required int64 brokerCheckSum = 3; - required string hostName = 4; - optional MasterCertificateInfo authInfo = 5; - optional string jdkVersion = 6; - optional ApprovedClientConfig appdConfig = 7; -} - -message RegisterResponseM2P { - required bool success = 1; - required int32 errCode = 2; - required string errMsg = 3; - required int64 brokerCheckSum = 4; - repeated string brokerInfos = 5; - optional MasterAuthorizedInfo authorizedInfo = 6; - optional ApprovedClientConfig appdConfig = 7; -} -``` - -**ClientId**:Producer needs to construct a ClientId at startup, and the current construction rule is: - -```java -ClientId = consumerGroup + "_" - + AddressUtils.getLocalAddress() + "_" // local ip (IPV4) - + pid + "_" // processId - + timestamp + "_" // timestamp - + counter + "_" // increament counter - + consumerType + "_" // type of consumer,including Pull and Push - + clientVersion; // version for client -``` -it is recommended that other languages add the above markup for easier access to the issue Exclusion. The ID value is valid for the lifetime of the Producer. - -**TopicList**: The list of topics published by the user, Producer provides the initial list of topics for the data to be published at initialization, and also allows the business to defer adding new topics via the publish function in runtime, but does not support reducing topics in runtime. - -**brokerCheckSum**: The check value of the Broker metadata information stored locally by the client, which is not available locally in Producer at initial startup, takes the value as -1; the SDK needs to carry the last BrokerCheckSum value on each request, and the Master determines whether the client's metadata needs to be updated by comparing the value. - -**hostname**: The IPV4 address value of the machine where the Producer is located. - -**success**: Whether the operation is successful, success is true, failure is false. - -**errCode**: The code of error, currently one error code represents a large class of error, the specific cause of the error needs to be specifically identified by `errMsg`. - -**errMsg**: The specific error message that the SDK needs to print out if something goes wrong. - -**authInfo**:Authentication authorization information, if the user configuration is filled in to start authentication processing, then fill in; if authentication is required, then report according to the signature of the user name and password; if it is running, such as heartbeat, if the Master forces authentication processing, then report according to the signature of the user name and password; if not, then authenticate according to the authorization Token provided by the Master during the previous interaction; this authorization Token is also used to carry the message to Broker during production. - -**brokerInfos**: Broker metadata information, which is primarily a list of Broker information for the entire cluster that the Master feeds back to the Producer in this field; the format is as follows. - -```java -public BrokerInfo(String strBrokerInfo, int brokerPort) { - String[] strBrokers = - strBrokerInfo.split(TokenConstants.ATTR_SEP); - this.brokerId = Integer.parseInt(strBrokers[0]); - this.host = strBrokers[1]; - this.port = brokerPort; - if (!TStringUtils.isBlank(strBrokers[2])) { - this.port = Integer.parseInt(strBrokers[2]); - } - this.buildStrInfo(); - } -``` - -**authorizedInfo**: Master provides authorization information in the following format. - -```protobuf -message MasterAuthorizedInfo { - required int64 visitAuthorizedToken = 1; - optional string authAuthorizedToken = 2; -} -``` - -**visitAuthorizedToken**: To prevent clients from bypassing the Master's access authorization token, if that data is available, the SDK should save it locally and carry that information on subsequent visits to the Broker; if the field is changed on subsequent heartbeats, the locally cached data for that field needs to be updated. - -**authAuthorizedToken**:Authenticated authorization tokens, if they have data for that field, they need to save and carry that field information for subsequent accesses to the Master and Broker; if the field is changed on subsequent heartbeats, the local cache of that field data needs to be updated. - - -### 5.2 Mseeage: Heartbeat from Producer to Master: - ----------- - -```protobuf -message HeartRequestP2M { - required string clientId = 1; - required int64 brokerCheckSum = 2; - required string hostName = 3; - repeated string topicList = 4; - optional MasterCertificateInfo authInfo = 5; - optional ApprovedClientConfig appdConfig = 6; -} - -message HeartResponseM2P { - required bool success = 1; - required int32 errCode = 2; - required string errMsg = 3; - required int64 brokerCheckSum = 4; - /* brokerId:host:port-topic:partitionNum */ - repeated string topicInfos = 5; - repeated string brokerInfos = 6; - optional bool requireAuth = 7; - optional MasterAuthorizedInfo authorizedInfo = 8; - optional ApprovedClientConfig appdConfig = 9; -} -``` - -**topicInfos**: The metadata information corresponding to the Topic published by the SDK, including partition information and the Broker where it is located, is decoded. Since there is a lot of metadata, the outflow generated by passing the object data through as is would be very large, so we made Improvements. - -```java -public static Tuple2, List> convertTopicInfo( - Map brokerInfoMap, List strTopicInfos) { - List topicList = new ArrayList<>(); - Map topicMaxSizeInBMap = new ConcurrentHashMap<>(); - if (strTopicInfos == null || strTopicInfos.isEmpty()) { - return new Tuple2<>(topicMaxSizeInBMap, topicList); - } - String[] strInfo; - String[] strTopicInfoSet; - String[] strTopicInfo; - BrokerInfo brokerInfo; - for (String info : strTopicInfos) { - if (info == null || info.isEmpty()) { - continue; - } - info = info.trim(); - strInfo = info.split(TokenConstants.SEGMENT_SEP, -1); - strTopicInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP); - for (String s : strTopicInfoSet) { - strTopicInfo = s.split(TokenConstants.ATTR_SEP); - brokerInfo = brokerInfoMap.get(Integer.parseInt(strTopicInfo[0])); - if (brokerInfo != null) { - topicList.add(new TopicInfo(brokerInfo, - strInfo[0], Integer.parseInt(strTopicInfo[1]), - Integer.parseInt(strTopicInfo[2]), true, true)); - } - } - if (strInfo.length == 2 || TStringUtils.isEmpty(strInfo[2])) { - continue; - } - try { - topicMaxSizeInBMap.put(strInfo[0], Integer.parseInt(strInfo[2])); - } catch (Throwable e) { - // - } - } - return new Tuple2<>(topicMaxSizeInBMap, topicList); - } -``` - -**requireAuth**: Code to indicates the expiration of the previous authAuthorizedToken of the Master, requiring the SDK to report the username and password signatures on the next request. - -### 5.3 Message: Producer exits from Master: - ----------- - -```pro -message CloseRequestP2M{ - required string clientId = 1; - optional MasterCertificateInfo authInfo = 2; -} - -message CloseResponseM2P{ - required bool success = 1; - required int32 errCode = 2; - required string errMsg = 3; -} -``` - -Note that if authentication is enable, closing operation will do the authentication to avoid external interference with the operation. - -### 5.4 Message: Producer to Broker: - ----------- - -This part is related to the definition of RPC Message. - -```protobuf -message SendMessageRequestP2B { - required string clientId = 1; - required string topicName = 2; - required int32 partitionId = 3; - required bytes data = 4; - required int32 flag = 5; - required int32 checkSum = 6; - required int32 sentAddr = 7; - optional string msgType = 8; - optional string msgTime = 9; - optional AuthorizedInfo authInfo = 10; -} - -message SendMessageResponseB2P { - required bool success = 1; - required int32 errCode = 2; - required string errMsg = 3; - optional bool requireAuth = 4; - optional int64 messageId = 5; - optional int64 appendTime = 6; - optional int64 appendOffset = 7; -} -``` -**Data** is the binary byte stream of Message. - -```protobuf -private byte[] encodePayload(final Message message) { - final byte[] payload = message.getData(); - final String attribute = message.getAttribute(); - if (TStringUtils.isBlank(attribute)) { - return payload; - } - byte[] attrData = StringUtils.getBytesUtf8(attribute); - final ByteBuffer buffer = - ByteBuffer.allocate(4 + attrData.length + payload.length); - buffer.putInt(attrData.length); - buffer.put(attrData); - buffer.put(payload); - return buffer.array(); - } -``` - -**sentAddr** is the local IPv4 address of the machine where the SDK is located converted to a 32-bit numeric ID. - -**msgType** is the type of filter message. `msgTime` is the message time when the SDK sends a message, its value comes from the value filled in by `putSystemHeader` when constructing Message, and there is a corresponding API in Message to get it. - -**requireAuth**: Required authentication operations to Broker for data production, not currently in effect due to performance issues. The authAuthorizedToken value in the sent message is based on the value provided by the Master and will change with the change of the Master. - -### 5.5 Partition Loadbalance: - ----------- - -Apache TubeMQ currently uses a server-side load balancing mode, where the balancing process is managed and maintained by the server; subsequent versions will add a client-side load balancing mode, so that two modes can co-exist. - -**Server side load balancing**: - -- When the Master process starts, it starts the load-balancing thread balancerChore. balancerChore periodically checks the current registered consumer group for load balancing. The process is simply to evenly distribute the consumer group subscription partitions to registered clients, and periodically detect the current partition of the client If so, the extra partitions will be split to other clients with less number of subscriptions. First, the master checks if the current consumer group needs load balancing. The topic collection is sorted by all partitions of the topic, and all consumer IDs of this consumer group, and then by the consumer group's all Divide and model the number of partitions and the number of clients to get the number of partitions each client subscribes to at most; then give each client the Assign partitions and carry the partition information in the heartbeat response when the consumer subscribes; if the client has more than one partition currently in place Give the client a partition release command to partition the partition away from the consumer, and to the assigned consumer A partition assignment instruction that informs that the partition is assigned to the corresponding client is as follows. - -Translated with www.DeepL.com/Translator (free version) - -```protobuf -message EventProto{ - optional int64 rebalanceId = 1; - optional int32 opType = 2; - optional int32 status = 3; - /* consumerId@group-brokerId:host:port-topic:partitionId */ - repeated string subscribeInfo = 4; -} -``` - -**rebalanceId**:A long-type auto-increment number that indicates the round of load balance. - -**opType**:Operation code, and its value defined in EventType. There are only four parts of the opcode that have been implemented, as follows: `DISCONNECT`, `CONNECT`, `REPORT` and `ONLY_`. Opcode started with `ONLY` is not detailed developed. - -```java -switch (event.getType()) { - case DISCONNECT: - case ONLY_DISCONNECT: - disconnectFromBroker(event); - rebalanceResults.put(event); - break; - case CONNECT: - case ONLY_CONNECT: - connect2Broker(event); - rebalanceResults.put(event); - break; - case REPORT: - reportSubscribeInfo(); - break; - case STOPREBALANCE: - break; - default: - throw new TubeClientException(strBuffer - .append("Invalid rebalance opCode:") - .append(event.getType()).toString()); -} -``` - -**status**:Defined in `EventStatus`, indicates the status of the event. When Master constructs a load balancing task, it sets the status to `TODO`. When receiving the client heartbeat request, master writes the task to the response message and sets the status to `PROCESSING`. The client receives a load balancing command from the heartbeat response, and then it can perform the actual connection or disconnection operation, after the operation is finished, set the command status to `DONE` until sending next heartbeat to master. - -```java -public enum EventStatus { - /** - * To be processed state. - * */ - TODO(0, "To be processed"), - /** - * On processing state. - * */ - PROCESSING(1, "Being processed"), - /** - * Processed state. - * */ - DONE(2, "Process Done"), - - /** - * Unknown state. - * */ - UNKNOWN(-1, "Unknown event status"), - /** - * Failed state. - * */ - FAILED(-2, "Process failed"); -} -``` - -**subscribeInfo** indicates assigned partition information, in the format suggested by the comment. - - -- Consumer Operation: When consumer receives metadata returned from master, it should establish the connection and release the operation(Refer to the opType note above). When connection established, return the operation result to master so that consumer can receive some relative job and perform. What we need to know is the LoadBalance of registration is a best-effort operation, if a new consumer send a request for connection before the consumer who occupanies the partition quits, it will receive `PARTITION\_OCCUPIED` exception response. And at this time partition tries to remove it from its queue. And partition consumer will also remove it when receiving corresponding response so that the consumer could successfully register to this partition in next load balance. \ No newline at end of file diff --git a/docs/modules/tubemq/img/client_rpc/rpc_broker_info.png b/docs/modules/tubemq/img/client_rpc/rpc_broker_info.png deleted file mode 100644 index 4747a884b14..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_broker_info.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_bytes_def.png b/docs/modules/tubemq/img/client_rpc/rpc_bytes_def.png deleted file mode 100644 index 45a238426ce..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_bytes_def.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_conn_detail.png b/docs/modules/tubemq/img/client_rpc/rpc_conn_detail.png deleted file mode 100644 index 6e803af5bd2..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_conn_detail.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_consumer_diagram.png b/docs/modules/tubemq/img/client_rpc/rpc_consumer_diagram.png deleted file mode 100644 index f761f54403a..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_consumer_diagram.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_convert_topicinfo.png b/docs/modules/tubemq/img/client_rpc/rpc_convert_topicinfo.png deleted file mode 100644 index 6c5bffac056..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_convert_topicinfo.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_event_proto.png b/docs/modules/tubemq/img/client_rpc/rpc_event_proto.png deleted file mode 100644 index 430d29737aa..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_event_proto.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_event_proto_optype.png b/docs/modules/tubemq/img/client_rpc/rpc_event_proto_optype.png deleted file mode 100644 index 9685b802526..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_event_proto_optype.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_event_proto_status.png b/docs/modules/tubemq/img/client_rpc/rpc_event_proto_status.png deleted file mode 100644 index 7a787cceb9c..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_event_proto_status.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_header_fill.png b/docs/modules/tubemq/img/client_rpc/rpc_header_fill.png deleted file mode 100644 index 0023e89abdb..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_header_fill.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_inner_structure.png b/docs/modules/tubemq/img/client_rpc/rpc_inner_structure.png deleted file mode 100644 index 9533ce46b30..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_inner_structure.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_master_authorizedinfo.png b/docs/modules/tubemq/img/client_rpc/rpc_master_authorizedinfo.png deleted file mode 100644 index 097fb0586cc..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_master_authorizedinfo.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_message_data.png b/docs/modules/tubemq/img/client_rpc/rpc_message_data.png deleted file mode 100644 index fa7a66ef952..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_message_data.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_pbmsg_structure.png b/docs/modules/tubemq/img/client_rpc/rpc_pbmsg_structure.png deleted file mode 100644 index 1ec4faf637d..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_pbmsg_structure.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_producer_close2M.png b/docs/modules/tubemq/img/client_rpc/rpc_producer_close2M.png deleted file mode 100644 index 5342d62182f..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_producer_close2M.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_producer_diagram.png b/docs/modules/tubemq/img/client_rpc/rpc_producer_diagram.png deleted file mode 100644 index 9d087e77f8d..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_producer_diagram.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_producer_heartbeat2M.png b/docs/modules/tubemq/img/client_rpc/rpc_producer_heartbeat2M.png deleted file mode 100644 index 3dc4367cbb4..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_producer_heartbeat2M.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_producer_register2M.png b/docs/modules/tubemq/img/client_rpc/rpc_producer_register2M.png deleted file mode 100644 index 6add74c04f5..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_producer_register2M.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_producer_sendmsg2B.png b/docs/modules/tubemq/img/client_rpc/rpc_producer_sendmsg2B.png deleted file mode 100644 index 2a81905e09b..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_producer_sendmsg2B.png and /dev/null differ diff --git a/docs/modules/tubemq/img/client_rpc/rpc_proto_def.png b/docs/modules/tubemq/img/client_rpc/rpc_proto_def.png deleted file mode 100644 index f56c2755db0..00000000000 Binary files a/docs/modules/tubemq/img/client_rpc/rpc_proto_def.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/img/tubemq_frame.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/img/tubemq_frame.png new file mode 100644 index 00000000000..28a388c3693 Binary files /dev/null and b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/img/tubemq_frame.png differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/img/tubemq_rpc.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/img/tubemq_rpc.png new file mode 100644 index 00000000000..a76bd86951f Binary files /dev/null and b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/img/tubemq_rpc.png differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/img/tubemq_rpc_consumer.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/img/tubemq_rpc_consumer.png new file mode 100644 index 00000000000..16eb8c61a5b Binary files /dev/null and b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/img/tubemq_rpc_consumer.png differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/img/tubemq_rpc_producer.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/img/tubemq_rpc_producer.png new file mode 100644 index 00000000000..66495b9aa25 Binary files /dev/null and b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/img/tubemq_rpc_producer.png differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/tubemq_binary.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/tubemq_binary.md new file mode 100644 index 00000000000..09639fccd7d --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/tubemq_binary.md @@ -0,0 +1,466 @@ +--- +title: TubeMQ 二进制交互协议 +sidebar_position: 4 +--- + +## 概述 + +InLong TubeMQ 模块的各个节点间(Client、Master、Broker)以 TCP 长连接方式进行交互,采用自定义的二进制编码协议构造交互的请求和响应消息。本文主要介绍该二进制协议的定义,及示例如何通过该协议完成 TubeMQ 的生产和消费交互全过程。 + +## TubeMQ 消息格式 + +下图是 TubeMQ 消息格式定义示意图: + +![TubeMQ 协议格式](img/tubemq_frame.png) + +从上图可以看出,每条交互消息都是由固定 3 个部分组成: + +- MsgToken: 该字段用来标识 TubeMQ 消息合法性,每个 TubeMQ 消息都会携带规定的 `RPC_PROTOCOL_BEGIN_TOKEN` 参数值。客户端收到不是以该字段开始的消息时,说明消息非 TubeMQ 发送的合法消息,可以根据策略关闭该连接,提示错误退出或者重连; + +- SerialNo:消息序列号,由请求方生成,并由请求的接收方在响应消息里原样返回,供响应接收方关联响应消息锁对应的请求; + +- 消息内容部分:该部分由 Protobuf 编码,通过几个复合部分组成: + + - ListSize:4 字节,表示按照 Protobuf 编码后的数据以一定长度切割后的数据块总个数,目前协议定义下该字段不为 0; + + - `[]`: 数据块,采用 2 个字段组成,表示发送的数据块长度及数据内容,其中: + + - Length:标识数据块长度 + + - Data:标识数据块的二进制数据内容 + +为什么会以 `ListSize []` 形式来定义 Protobuf (下文简称 PB) 编码的数据内容? +主要原因是 TubeMQ 最初实现中序列化后的 PB 数据采用的是 ByteBuffer 对象保存,Java 里 ByteBuffer 单个最大块长上限是 8196 字节,超过单个块长度的 PB 消息内容就被多个 ByteBuffer 保存;而数据序列化到 TCP 消息时没有统计总长,直接按照 PB 序列化的 ByteBuffer 列表写入到了消息中。 +**在多语言实现时候,这块需要特别注意:** 需要将 PB 数据内容序列化成块数组(PB 编解码里有对应支持)。 + +消息内容部分的 PB 编解码文件存储在 `org.apache.inlong.tubemq.corerpc` 模块下,详细的格式定义参考相关文件。 + +### PB格式编码 + +PB 协议分为三个部分: + +- RPC 框架定义:`RPC.proto` + +- Master 相关的消息编码:`MasterService.proto` + +- Broker 相关的消息编码:`BrokerService.proto` + +这些协议定义文件在通过 PB 直接编译后获得对应的实现类。 + +以 RPC.proto 为例介绍,RPC.proto 定义了 6 个结构,分为 2 种类型: + +- 请求消息 + +- 响应消息,包括正常响应返回以及抛异常情况下的响应返回 + +请求消息编码及响应消息解码可以参考 `NettyClient.java` 类实现,这个部分的定义存在一些改进空间,具体见 [TUBEMQ-109](https://issues.apache.org/jira/browse/TUBEMQ-109),但由于兼容性考虑,会逐步的替换,按照当前 proto 版本实现至少在 1.0.0 版本前交互不是问题,但 1.0.0 时会考虑用新协议,协议实现模块需要各个 SDK 预留出改进空间。 + +以请求消息填写为例,RpcConnHeader 等相关结构如下: + +```protobuf +message RpcConnHeader { + required int32 flag = 1; + optional int64 traceId = 2; + optional int64 spanId = 3; + optional int64 parentId = 4; +} + +message RequestHeader { + optional int32 serviceType = 1; + optional int32 protocolVer = 2; +} + +message RequestBody { + required int32 method = 1; + optional int64 timeout = 2; + optional bytes request = 3; +} +``` +其中: + +- `RpcConnHeader` 的 `flag` 标记的是否请求消息,后面 3 个字段标记的是消息跟踪的相关内容,目前没有使用; + +- `RequestHeader` 包含了服务类型,协议版本相关信息; + +- `RequestBody` 包含了请求方法,超时时间,请求内容,其中 `timeout` 是一个请求被服务器收到到实际处理时的最大允许等待时间长,超过就丢弃,目前缺省为 10 秒。 + +请求填写具体实现见如下部分: + +```java +RequestWrapper requestWrapper = + new RequestWrapper(PbEnDecoder.getServiceIdByServiceName(targetInterface), + RpcProtocol.RPC_PROTOCOL_VERSION, + RpcConstants.RPC_FLAG_MSG_TYPE_REQUEST, + requestTimeout); // 请求超时时间 +``` +至此,TubeMQ 的协议格式定义介绍完成,接下来我们以这些协议格式组成的消息完成数据生产和消费。 + +## 客户端请求响应交互 + +### Producer 生产交互图 + +Producer 一共使用了 4 对指令:与 Master 节点进行注册,保持心跳,以及退出注册操作;与 Broker 节点交互上报消息: + +![Producer RPC 交互](img/tubemq_rpc_producer.png) + +从这里我们可以看到,Producer 从 Master 获取指定 Topic 对应的分区列表等元数据信息,获得这些信息后按照客户端的规则选择分区并把消息发送给对应的 Broker。 + +Producer 在 **多语言实现时需要注意:** + +- Master 是有主备节点的,只有主节点可以提供服务,当 Producer 链接到备节点时会收到 `StandbyException` 异常响应。此时客户端实现需要选择其他的 Master 节点进行注册,并最终选择注册上主 Master 节点为止; + +- 生产过程中遇到 Master 连接失败时,比如超时,链接被动断开等,Producer 必须发起到 Master 重注册请求; + +- Producer 从 Master 收到 Topic 的元数据信息后要提前做到 Broker 预连接操作,避免数据生产时突增大量链接请求影响消息上报性能; + +- Producer 到 Broker 的连接要做异常检测:长期运行场景要能检测出 Broker 故障节点,并且对于长期不发消息的链接要将其回收,提高数据上报服务的稳定性。 + +### Consumer 消费交互图 + +Consumer 一共使用了 8 对指令:与 Master 进行注册,心跳,注销操作;与 Broker 进行注册,注销,心跳,拉取消息,确认消息操作;其中到 Broker 的注册、注销使用了同一个命令名携带不同的状态码方式来标识和区分不同操作: + +![Consumer RPC 交互](img/tubemq_rpc_consumer.png) + +从上图示例我们可以看到: + +- Consumer 注册到主 Master 节点时,Master 并没有返回元数据信息到 Consumer,而是后续心跳环节返回。其原因在于示例的 Consumer 采用的是服务器端负载均衡模式,需要等待服务器派发消费分区信息后才能获得对应的消费分区; + +- Consumer 到 Broker 存在注册、注销操作,其原因在于消费时候分区是独占消费,即同一时刻同一分区者只能被同组的一个消费者进行消费,客户端通过注册操作获得分区的消费权限; + +- Consumer 消息拉取与消费确认需要成对出现,通过数据消费二次确认来尽可能减少重复消费问题,异常情况下的数据被漏消费问题。 + + +### 客户端与服务器端 RPC 交互过程 + +如下图示: + +![TubeMQ RPC 实现](img/tubemq_rpc.png) + +- 客户端与 TubeMQ 服务端交互时要维持已发请求消息的本地保存,直到 RPC 超时,或者收到响应消息; + +- 客户端通过响应消息携带的 SerialNo 值与之前缓存的已发送请求记录相关联; + +- 从 Master 收到 Broker 及 Topic 元数据信息后,客户端要保存本地并使用最新的元数据进行更新,同时将缓存的元数据定期上报给 Master; + +- 客户端要维持到 Master 或者 Broker 的心跳,如果发现 Master 反馈注册超时错误时,要进行重注册操作; + +- 客户端要基于 Broker 进行连接建立,同一个进程不同对象之间允许业务选择按对象或按进程建立连接。 + +#### Producer 到 Master 注册 + +---------- + +```protobuf +message RegisterRequestP2M { + required string clientId = 1; + repeated string topicList = 2; + required int64 brokerCheckSum = 3; + required string hostName = 4; + optional MasterCertificateInfo authInfo = 5; + optional string jdkVersion = 6; + optional ApprovedClientConfig appdConfig = 7; +} + +message RegisterResponseM2P { + required bool success = 1; + required int32 errCode = 2; + required string errMsg = 3; + required int64 brokerCheckSum = 4; + repeated string brokerInfos = 5; + optional MasterAuthorizedInfo authorizedInfo = 6; + optional ApprovedClientConfig appdConfig = 7; +} +``` + +- clientId:标识 Producer 对象,该 ID 值在 Producer 启动时构造并在 Producer 生命周期内有效,目前 Java 版本的 SDK 构造规则是: + + ```java + ClientId = consumerGroup + "_" + + AddressUtils.getLocalAddress() + "_" // 本机IP (IPV4) + + pid + "_" // 进程ID + + timestamp + "_" // 时间戳 + + counter + "_" // 自增计数器 + + consumerType + "_" // 消费者类型,包含 Pull 和 Push 两种类型 + + clientVersion; // 客户端版本号 + ``` + 建议其他语言增加如上标记,以便于问题排查; + +- topicList:标识用户发布的 Topic 列表,Producer 在初始化时候会提供初始的待发布数据的 Topic 列表,在运行中也允许业务通过 Publish 函数延迟的增加新的 Topic 及减少已 Publish 的 Topic; + +- brokerCheckSum:客户端本地保存的 Broker 元数据信息的校验值,初始启动时候 Producer 本地是没有该数据的,取 -1 值;SDK 需要在每次请求时把上次的 brokerCheckSum 值携带上,Master 通过比较该值来确定客户端的元数据是否需要更新; + +- hostname:Producer 所在机器的 IPV4 地址值; + +- success:操作是否成功,成功为 true,失败为 false; + +- errCode:错误码,结合 errMsg 信息判定具体的错误原因; + +- errMsg:错误信息,如果请求响应失败 SDK 需要把具体错误信息打出来 + +- authInfo:认证授权信息,如果用户配置里填写了启动认证处理,则进行填写;如果是要求认证,则按照用户名及密码的签名进行上报,如果是运行中,比如心跳时,如果 Master 强制认证处理,则按照用户名及密码签名上报,没有的话则根据之前交互时 Master 提供的授权 Token 进行认证;该授权 Token 在生产时候也用于到 Broker 的消息生产时携带。 + +- brokerInfos:Broker 元数据信息,该字段里主要是 Master 反馈给 Producer 的整个集群的 Broker 信息列表;其格式如下: + + ```java + public BrokerInfo(String strBrokerInfo, int brokerPort) { + String[] strBrokers = + strBrokerInfo.split(TokenConstants.ATTR_SEP); + this.brokerId = Integer.parseInt(strBrokers[0]); + this.host = strBrokers[1]; + this.port = brokerPort; + if (!TStringUtils.isBlank(strBrokers[2])) { + this.port = Integer.parseInt(strBrokers[2]); + } + this.buildStrInfo(); + } + ``` + +- authorizedInfo:Master 提供的授权信息,格式如下: + + ```protobuf + message MasterAuthorizedInfo { + required int64 visitAuthorizedToken = 1; + optional string authAuthorizedToken = 2; + } + ``` + +- visitAuthorizedToken:访问授权 Token,防止客户端绕开 Master 访问 Broker 节点,SDK 需要将该信息保存本地,并且在后续访问 Broker 时携带该信息;如果后续心跳时该字段有变更,则需要更新本地缓存的该字段数据; + +- authAuthorizedToken:认证通过的授权 Token,如果有该字段数据 SDK 需要保存并在后续访问 Master 及 Broker 时携带该字段信息;如果后续心跳时该字段有变更,则需要更新本地缓存的该字段数据; + + +#### Producer 到 Master 心跳 + +---------- + +```protobuf +message HeartRequestP2M { + required string clientId = 1; + required int64 brokerCheckSum = 2; + required string hostName = 3; + repeated string topicList = 4; + optional MasterCertificateInfo authInfo = 5; + optional ApprovedClientConfig appdConfig = 6; +} + +message HeartResponseM2P { + required bool success = 1; + required int32 errCode = 2; + required string errMsg = 3; + required int64 brokerCheckSum = 4; + /* brokerId:host:port-topic:partitionNum */ + repeated string topicInfos = 5; + repeated string brokerInfos = 6; + optional bool requireAuth = 7; + optional MasterAuthorizedInfo authorizedInfo = 8; + optional ApprovedClientConfig appdConfig = 9; +} +``` + +- topicInfos:SDK 发布的 Topic 元数据信息,包括分区信息以及所在的 Broker 节点,具体解码方式如下: + + ```java + public static Tuple2, List> convertTopicInfo( + Map brokerInfoMap, List strTopicInfos) { + List topicList = new ArrayList<>(); + Map topicMaxSizeInBMap = new ConcurrentHashMap<>(); + if (strTopicInfos == null || strTopicInfos.isEmpty()) { + return new Tuple2<>(topicMaxSizeInBMap, topicList); + } + String[] strInfo; + String[] strTopicInfoSet; + String[] strTopicInfo; + BrokerInfo brokerInfo; + for (String info : strTopicInfos) { + if (info == null || info.isEmpty()) { + continue; + } + info = info.trim(); + strInfo = info.split(TokenConstants.SEGMENT_SEP, -1); + strTopicInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP); + for (String s : strTopicInfoSet) { + strTopicInfo = s.split(TokenConstants.ATTR_SEP); + brokerInfo = brokerInfoMap.get(Integer.parseInt(strTopicInfo[0])); + if (brokerInfo != null) { + topicList.add(new TopicInfo(brokerInfo, + strInfo[0], Integer.parseInt(strTopicInfo[1]), + Integer.parseInt(strTopicInfo[2]), true, true)); + } + } + if (strInfo.length == 2 || TStringUtils.isEmpty(strInfo[2])) { + continue; + } + try { + topicMaxSizeInBMap.put(strInfo[0], Integer.parseInt(strInfo[2])); + } catch (Throwable e) { + // + } + } + return new Tuple2<>(topicMaxSizeInBMap, topicList); + } + ``` + +- requireAuth:标识 Master 之前的授权访问码(authAuthorizedToken)过期,要求 SDK 在下一次请求时携带用户名及密码的签名信息进行认证操作; + +#### Producer 到 Master 关闭退出 + +---------- + +```protobuf +message CloseRequestP2M{ + required string clientId = 1; + optional MasterCertificateInfo authInfo = 2; +} + +message CloseResponseM2P{ + required bool success = 1; + required int32 errCode = 2; + required string errMsg = 3; +} +``` + +需要注意的是,如果认证开启,关闭会做认证,以避免外部干扰操作。 + +#### Producer 到 Broker 发送消息 + +---------- + +该部分的内容主要和 Message 的定义有关联,其中 + +```protobuf +message SendMessageRequestP2B { + required string clientId = 1; + required string topicName = 2; + required int32 partitionId = 3; + required bytes data = 4; + required int32 flag = 5; + required int32 checkSum = 6; + required int32 sentAddr = 7; + optional string msgType = 8; + optional string msgTime = 9; + optional AuthorizedInfo authInfo = 10; +} + +message SendMessageResponseB2P { + required bool success = 1; + required int32 errCode = 2; + required string errMsg = 3; + optional bool requireAuth = 4; + optional int64 messageId = 5; + optional int64 appendTime = 6; + optional int64 appendOffset = 7; +} +``` + +- data:Message 的二进制字节流信息,实现如下: + + ```java + private byte[] encodePayload(final Message message) { + final byte[] payload = message.getData(); + final String attribute = message.getAttribute(); + if (TStringUtils.isBlank(attribute)) { + return payload; + } + byte[] attrData = StringUtils.getBytesUtf8(attribute); + final ByteBuffer buffer = + ByteBuffer.allocate(4 + attrData.length + payload.length); + buffer.putInt(attrData.length); + buffer.put(attrData); + buffer.put(payload); + return buffer.array(); + } + ``` + +- sentAddr:SDK 所在本机的 IPv4,这里将IP地址转为 32 位的数字 ID; + +- msgType:消息所属的 stream 值,消费过滤时使用; + +- msgTime SDK 发消息时的消息时间,其值来源于构造 Message 时通过 putSystemHeader 填写的值; + +- requireAuth:到 Broker 进行数据生产的是否要求认证标识,考虑性能问题,目前未生效,发送消息里填写的 authAuthorizedToken 值以 Master 侧提供的值为准,并且随 Master 侧改变而改变。 + +#### 分区负载均衡过程 + +---------- + +InLong TubeMQ 模块目前支持服务器端负载均衡及客户端均衡共 2 种均衡模式,由业务根据需要选择不同的均衡方式。 + +服务器均衡过程由服务器管理维护,对 Consumer 消费端实现要求比较低,其负载均衡过程如下: + +1. Master 进程启动后启动负载均衡线程 balancerChore,balancerChore 定时检查当前已注册的消费组,进行负载均衡处理。过程简单来说就是将消费组订阅的分区均匀的分配给已注册的客户端,并定期检测客户端当前分区数是否超过预定的数量,如果超过则将多余的分区拆分给其他数量少的客户端。 + +2. Master 检查当前消费组是否需要做负载均衡,如果需要,则将消费组订阅的 Topic 集合的所有分区,以及这个消费组的所有消费者 ID 进行排序,然后按照消费组的所有分区数以及客户端个数进行整除及取模,获得每个客户端至多订阅的分区数;然后给每个客户端分配分区,并在消费者订阅时将分区信息在心跳响应里携带;如果客户端当前已有的分区有多,则给该客户端一条分区释放指令,将该分区从该消费者这里进行分区释放,同时给被分配的消费者一条分区分配的指令,告知分区分配给了对应客户端,具体指令如下: + + ```protobuf + message EventProto{ + optional int64 rebalanceId = 1; + optional int32 opType = 2; + optional int32 status = 3; + /* consumerId@group-brokerId:host:port-topic:partitionId */ + repeated string subscribeInfo = 4; + } + ``` + 其中: + + - rebalanceId:自增的 long 数值 ID,表示负载均衡的轮次; + + - subscribeInfo:表示分配的分区信息; + + - opType:操作码,值在 EventType 中定义,目前已实现的操作码只有如下 4 个部分:释放连接,建立连接;only\_xxx 目前没有扩展开,收到心跳里携带的负载均衡信息后,Consumer 根据这个值做对应的业务操作; + + ```java + switch (event.getType()) { + case DISCONNECT: + case ONLY_DISCONNECT: + disconnectFromBroker(event); + rebalanceResults.put(event); + break; + case CONNECT: + case ONLY_CONNECT: + connect2Broker(event); + rebalanceResults.put(event); + break; + case REPORT: + reportSubscribeInfo(); + break; + case STOPREBALANCE: + break; + default: + throw new TubeClientException(strBuffer + .append("Invalid rebalance opCode:") + .append(event.getType()).toString()); + } + ``` + - status:表示该事件状态,在 EventStatus 里定义: + + ```java + public enum EventStatus { + /** + * To be processed state. + * */ + TODO(0, "To be processed"), + /** + * On processing state. + * */ + PROCESSING(1, "Being processed"), + /** + * Processed state. + * */ + DONE(2, "Process Done"), + + /** + * Unknown state. + * */ + UNKNOWN(-1, "Unknown event status"), + /** + * Failed state. + * */ + FAILED(-2, "Process failed"); + } + ``` + +3. Master 构造好负载均衡处理任务时设置指令时状态为 TODO;客户端心跳请求过来时,Master 将该任务写到响应消息里,设置该指令状态为 PROCESSING;客户端从心跳响应里收到负载均衡指令,进行实际的连接或者断链操作,操作结束后,设置指令状态为 DONE,并等待下一次心跳请求发出时反馈给 Master; + +4. 消费端操作:Consumer 收到 Master 返回的元数据信息后,就进行连接建立和释放操作,见上面 opType 的注解,在连接建立好后,返回事件的处理结果给到 Master,从而完成相关的收到任务,执行任务,以及返回任务处理结果的操作;需要注意的是,负载均衡注册是尽力而为的操作,如果消费端发起连接操作,但之前占用分区的消费者还没有来得及退出时,会收到 `PARTITION_OCCUPIED` 的错误响应,这个时候就将该分区从尝试队列删除;而之前分区消费者在收到对应响应后仍会做删除操作,从而下一轮的负载均衡时分配到这个分区的消费者成功注册到分区上。 + +至此,消费端的消费均衡操作完成,消费者拿到分区信息后进行注册并消费数据。 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/client_rpc.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/client_rpc.md deleted file mode 100644 index 8d3668d0938..00000000000 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/client_rpc.md +++ /dev/null @@ -1,457 +0,0 @@ ---- -title: 客户端RPC ---- - -## 1 总体介绍: - -Apache InLong TubeMQ 模块的各个节点间(Client、Master、Broker)通过 TCP 协议长连接交互,其消息采用的是 【二进制 + Protobuf 编码】组合方式进行定义。 - -![](img/client_rpc/rpc_bytes_def.png) - -在 TCP 里我们看到的都是二进制流,包括: -- msgToken: 消息头字段 `RPC_PROTOCOL_BEGIN_TOKEN`,4 字节,用来区分每一条消息以及识别对端的合法性,如果客户端收到的响应消息不是以该字段开始,说明连接方非本系统支持的协议,或者返回数据出现了异常,这个时候需要关闭该连接,提示错误退出或者重连 -- serialNo:消息序列号,4 字节,主要用于客户端关联请求响应的上下文,由请求方生成,通过请求消息携带给服务端,服务器端完成该请求消息服务后通过请求消息的对应响应消息原样返回 -- listSize:4 字节,表示接下来按照 PB 编码的数据块个数,目前协议定义下该字段不为0 -- `[]`: 是 2 个字段组合,表示这个数据块长度及数据内容 - - len:即数据块长度 - - data:数据 - -> 为什么会以 `listSize []` 形式定义 Protobuf(下文简称 PB)数据内容? -> -> 因为在 TubeMQ 的实现中,序列化后的 PB 数据是通过 ByteBuffer 对象保存的,Java里 ByteBuffer 存在一个最大块长 8196,超过单个块长度的 PB 消息内容就需要用多个ByteBuffer 保存,序列化到 TCP 消息时候,没有统计总长,直接按照 PB 序列化的 ByteBuffer 列表写入到了消息中。 -> -> **在多语言实现时候,这块需要特别注意:** 需要将 PB 数据内容序列化成块数组(PB 编解码里有对应支持)。 - -客户端 RPC 对应的实现在 `org.apache.inlong.tubemq.corerpc` 模块下。 -## 2 PB格式编码: - -PB 协议分为三个部分: -- RPC 框架定义:`RPC.proto` -- Master 相关的消息编码:`MasterService.proto` -- Broker 相关的消息编码:`BrokerService.proto` - -可以通过 PB 直接编译得到对应的实现类。 - - -RPC.proto定义了 6 个结构,分为 2 种类型: -- 请求消息 -- 响应消息,包括正常的响应返回以及抛异常情况下的响应返回 - -请求消息编码及响应消息解码可以参考 `NettyClient.java` 类实现,这个部分的定义存在一些改进空间,具体见 [TUBEMQ-109](https://issues.apache.org/jira/browse/TUBEMQ-109),但由于兼容性考虑,会逐步的替换,按照当前 proto 版本实现至少在 1.0.0 版本前交互不是问题,但 1.0.0 时会考虑用新协议,协议实现模块需要各个 SDK 预留出改进空间。 - -以请求消息填写为例,RpcConnHeader等相关结构如下: - -```protobuf -message RpcConnHeader { - required int32 flag = 1; - optional int64 traceId = 2; - optional int64 spanId = 3; - optional int64 parentId = 4; -} - -message RequestHeader { - optional int32 serviceType = 1; - optional int32 protocolVer = 2; -} - -message RequestBody { - required int32 method = 1; - optional int64 timeout = 2; - optional bytes request = 3; -} -``` - -`RpcConnHeader` 的 `flag` 标记的是否请求消息,后面 3 个字段标记的是消息跟踪的相关内容,目前没有使用; - -`RequestHeader` 包含了服务类型,协议版本相关信息; - -`RequestBody` 则包含了请求方法,超时时间,请求内容; 其中 `timeout` 是一个请求被服务器收到到实际处理时的最大允许等待时间长,超过就丢弃,目前缺省为10秒。 请求填写具体见如下部分: -```java -RequestWrapper requestWrapper = - new RequestWrapper(PbEnDecoder.getServiceIdByServiceName(targetInterface), - RpcProtocol.RPC_PROTOCOL_VERSION, - RpcConstants.RPC_FLAG_MSG_TYPE_REQUEST, - requestTimeout); // 请求超时时间 -``` - - -## 3 客户端的PB请求响应交互图: - -### 3.1 Producer交互图: - -Producer 在系统中一共 4 对指令,到 master 是要做注册,心跳,退出操作;到 broker 只有发送消息: -![](img/client_rpc/rpc_producer_diagram.png) - -从这里我们可以看到,Producer 实现逻辑就是从 Master 侧获取指定 Topic 对应的分区列表等元数据信息,获得这些信息后按照客户端的规则选择分区并把消息发送给对应的 Broker。 - -Producer 的 **多语言实现的时候需要注意:** -- Master 是有主备节点的,只有主节点可以提供服务,当 producer 链接到备节点时,会得到 `StandbyException`, 此时需要链接到其他的 Master 节点,直到链接到主节点为止; - -- 生产过程中遇到 Master 连接失败时,比如超时,链接被动断开等,Producer 要进行重注册; - -- Producer 要注意提前做到 Broker 的预连接操作:后端集群的 Broker 节点可达上百台,再叠加每个 Broker 有十个左右的分区,关于分区记录就会存在上千条可能,SDK 从 Master 收到元数据信息后,要提前对暂未建链的 Broker 进行连接建立操作; - -- Producer 到 Broker 的连接要注意异常检测,长期运行场景,要能检测出 Broker 故障节点,并且对于长期不发消息的链接,要将其回收,避免运行不稳定。 - - -### 3.2 Consumer交互图: - -Consumer 一共 7 对指令,到 master 是要做注册,心跳,退出操作;到 broker 包括注册,注销,心跳,拉取消息,确认消息 4 对,其中到 Broker 的注册注销是同一个命令,用了不同的状态码表示: -![](img/client_rpc/rpc_consumer_diagram.png) - -从上图我们可以看到,Consumer 首先要注册到 Master,但注册到 Master 时并没有立即获取到元数据信息,原因是 TubeMQ 是采用的是服务器端负载均衡模式,客户端需要等待服务器派发消费分区信息;Consumer到Broker需要进行注册注销操作,原因在于消费时候分区是独占消费,即同一时刻同一分区者只能被同组的一个消费者进行消费,为了解决这个问题,需要客户端进行注册,获得分区的消费权限;消息拉取与消费确认需要成对出现,虽然协议支持多次拉取然后最后一次确认处理,但从客户端可能超时丢失分区的消费权限,从而导致数据回滚重复消费触发,数据积攒的越多重复消费的量就越多,所以按照 1:1 的提交比较合适。 - -## 4 客户端功能集合: - -| **特性** | **Java** | **C/C++** | **Go** | **Python** | **Rust** | **备注** | -|---------------------------------------| --- | --- | --- | --- | --- | --- | -| TLS | ✅ | | | | | | -| 认证授权 | ✅ | | | | | | -| 防绕 Master 生产消费 | ✅ | | | | | | -| 分布式系统里放置客户端不经过 Maste r的认证授权即访问 Broker | ✅ | | | | | | -| Effectively-Once | ✅ | | | | | | -| 精确指定分区 Offset 消费 | ✅ | | | | | | -| 单个组消费多个 Topic 消费 | ✅ | | | | | | -| 服务器过滤消费 | ✅ | | | | | | -| 生产节点坏点自动屏蔽 | ✅ | | | | | | -| 通过算法检测坏点,自动屏蔽故障 Broker 的数据发送 | ✅ | | | | | | -| 断链自动重连 | ✅ | | | | | | -| 空闲连接自动回收 | ✅ | | | | | | -| 超过指定分钟不活跃,主要是生产端,比如 3 分钟 | ✅ | | | | | | -| 连接复用 | ✅ | | | | | | -| 连接按照 sessionFactory 共用或者不共用 | ✅ | | | | | | -| 非连接复用 | ✅ | | | | | | -| 异步生产 | ✅ | | | | | | -| 同步生产 | ✅ | | | | | | -| Pull 消费 | ✅ | | | | | | -| Push 消费 | ✅ | | | | | | -| 消费限流 | ✅ | | | | | | -| 控制单位时间消费者消费的数据量 | ✅ | | | | | | -| 消费拉取频控 | ✅ | | | | | | -| 控制消费者拉取消息的频度 | ✅ | | | | | | - - -## 5 客户端功能 CaseByCase 实现介绍: - -### 5.1 客户端与服务器端 RPC 交互过程: - ----------- - -![](img/client_rpc/rpc_inner_structure.png) - -如上图示,客户端要维持已发请求消息的本地保存,直到RPC超时,或者收到响应消息,响应消息通过请求发送时生成的SerialNo关联;从服务器端收到的Broker信息,以及Topic信息,SDK要保存在本地,并根据最新的返回信息进行更新,以及定期的上报给服务器端;SDK要维持到Master或者Broker的心跳,如果发现Master反馈注册超时错误时,要进行重注册操作;SDK要基于Broker进行连接建立,同一个进程不同对象之间,要允许业务进行选择,是支持按对象建立连接,还是按照进程建立连接。 - -### 5.2 Producer 到 Master 注册: - ----------- -```protobuf -message RegisterRequestP2M { - required string clientId = 1; - repeated string topicList = 2; - required int64 brokerCheckSum = 3; - required string hostName = 4; - optional MasterCertificateInfo authInfo = 5; - optional string jdkVersion = 6; - optional ApprovedClientConfig appdConfig = 7; -} - -message RegisterResponseM2P { - required bool success = 1; - required int32 errCode = 2; - required string errMsg = 3; - required int64 brokerCheckSum = 4; - repeated string brokerInfos = 5; - optional MasterAuthorizedInfo authorizedInfo = 6; - optional ApprovedClientConfig appdConfig = 7; -} -``` - -**ClientId**:Producer 需要在启动时候构造一个 ClientId,目前的构造规则是: - -Java的SDK 版本里 -```java -ClientId = consumerGroup + "_" - + AddressUtils.getLocalAddress() + "_" // 本机IP (IPV4) - + pid + "_" // 进程ID - + timestamp + "_" // 时间戳 - + counter + "_" // 自增计数器 - + consumerType + "_" // 消费者类型,包含 Pull 和 Push 两种类型 - + clientVersion; // 客户端版本号 -``` -建议其他语言增加如上标记,以便于问题排查。该ID值在Producer生命周期内有效; - -**TopicList**:是用户发布的 Topic 列表,Producer 在初始化时候会提供初始的待发布数据的 Topic 列表,在运行中也允许业务通过 publish 函数延迟的增加新的 Topic ,但不支持运行中减少 topic; - -**brokerCheckSum**:客户端本地保存的 Broker 元数据信息的校验值,初始启动时候 Producer 本地是没有该数据的,取 -1 值;SDK 需要在每次请求时把上次的 brokerCheckSum 值携带上,Master 通过比较该值来确定客户端的元数据是否需要更新; - -**hostname**:Producer 所在机器的 IPV4 地址值; - -**success**:操作是否成功,成功为 true,失败为 false; - -**errCode**:如果失败,错误码时多少,目前错误码是大类错误码,具体错误原因需要由 errMsg 具体判明; - -**errMsg**:具体的错误信息,如果出错,SDK 需要把具体错误信息打出来 - -**authInfo**:认证授权信息,如果用户配置里填写了启动认证处理,则进行填写;如果是要求认证,则按照用户名及密码的签名进行上报,如果是运行中,比如心跳时,如果 Master 强制认证处理,则按照用户名及密码签名上报,没有的话则根据之前交互时 Master 提供的授权 Token 进行认证;该授权 Token 在生产时候也用于到 Broker 的消息生产时携带。 - -**brokerInfos**:Broker 元数据信息,该字段里主要是 Master 反馈给 Producer 的整个集群的 Broker 信息列表;其格式如下: - -```java -public BrokerInfo(String strBrokerInfo, int brokerPort) { - String[] strBrokers = - strBrokerInfo.split(TokenConstants.ATTR_SEP); - this.brokerId = Integer.parseInt(strBrokers[0]); - this.host = strBrokers[1]; - this.port = brokerPort; - if (!TStringUtils.isBlank(strBrokers[2])) { - this.port = Integer.parseInt(strBrokers[2]); - } - this.buildStrInfo(); - } -``` - -**authorizedInfo**:Master 提供的授权信息,格式如下: - -```protobuf -message MasterAuthorizedInfo { - required int64 visitAuthorizedToken = 1; - optional string authAuthorizedToken = 2; -} -``` - -**visitAuthorizedToken**:防客户端绕开 Master 的访问授权 Token,如果有该数据,SDK 要保存本地,并且在后续访问 Broker 时携带该信息;如果后续心跳时该字段有变更,则需要更新本地缓存的该字段数据; - -**authAuthorizedToken**:认证通过的授权 Token,如果有该字段数据,要保存,并且在后续访问 Master 及 Broker 时携带该字段信息;如果后续心跳时该字段有变更,则需要更新本地缓存的该字段数据; - - -### 5.3 Producer 到 Master 保持心跳: - ----------- - -```protobuf -message HeartRequestP2M { - required string clientId = 1; - required int64 brokerCheckSum = 2; - required string hostName = 3; - repeated string topicList = 4; - optional MasterCertificateInfo authInfo = 5; - optional ApprovedClientConfig appdConfig = 6; -} - -message HeartResponseM2P { - required bool success = 1; - required int32 errCode = 2; - required string errMsg = 3; - required int64 brokerCheckSum = 4; - /* brokerId:host:port-topic:partitionNum */ - repeated string topicInfos = 5; - repeated string brokerInfos = 6; - optional bool requireAuth = 7; - optional MasterAuthorizedInfo authorizedInfo = 8; - optional ApprovedClientConfig appdConfig = 9; -} -``` - -**topicInfos**:SDK 发布的 Topic 对应的元数据信息,包括分区信息以及所在的 tBroker,具体解码方式如下,由于元数据非常的多,如果将对象数据原样透传所产生的出流量会非常的大,所以我们通过编码方式做了改进: - -```java -public static Tuple2, List> convertTopicInfo( - Map brokerInfoMap, List strTopicInfos) { - List topicList = new ArrayList<>(); - Map topicMaxSizeInBMap = new ConcurrentHashMap<>(); - if (strTopicInfos == null || strTopicInfos.isEmpty()) { - return new Tuple2<>(topicMaxSizeInBMap, topicList); - } - String[] strInfo; - String[] strTopicInfoSet; - String[] strTopicInfo; - BrokerInfo brokerInfo; - for (String info : strTopicInfos) { - if (info == null || info.isEmpty()) { - continue; - } - info = info.trim(); - strInfo = info.split(TokenConstants.SEGMENT_SEP, -1); - strTopicInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP); - for (String s : strTopicInfoSet) { - strTopicInfo = s.split(TokenConstants.ATTR_SEP); - brokerInfo = brokerInfoMap.get(Integer.parseInt(strTopicInfo[0])); - if (brokerInfo != null) { - topicList.add(new TopicInfo(brokerInfo, - strInfo[0], Integer.parseInt(strTopicInfo[1]), - Integer.parseInt(strTopicInfo[2]), true, true)); - } - } - if (strInfo.length == 2 || TStringUtils.isEmpty(strInfo[2])) { - continue; - } - try { - topicMaxSizeInBMap.put(strInfo[0], Integer.parseInt(strInfo[2])); - } catch (Throwable e) { - // - } - } - return new Tuple2<>(topicMaxSizeInBMap, topicList); - } -``` - -**requireAuth**:标识Master之前的授权访问码(authAuthorizedToken)过期,要求SDK下一次请求,进行用户名及密码的签名信息上报; - -### 5.4 Producer 到 Master 关闭退出: - ----------- - -```protobuf -message CloseRequestP2M{ - required string clientId = 1; - optional MasterCertificateInfo authInfo = 2; -} - -message CloseResponseM2P{ - required bool success = 1; - required int32 errCode = 2; - required string errMsg = 3; -} -``` - -需要注意的是,如果认证开启,关闭会做认证,以避免外部干扰操作。 - -### 5.5 Producer 到 Broker 发送消息: - ----------- - -该部分的内容主要和 Message 的定义有关联,其中 - -```protobuf -message SendMessageRequestP2B { - required string clientId = 1; - required string topicName = 2; - required int32 partitionId = 3; - required bytes data = 4; - required int32 flag = 5; - required int32 checkSum = 6; - required int32 sentAddr = 7; - optional string msgType = 8; - optional string msgTime = 9; - optional AuthorizedInfo authInfo = 10; -} - -message SendMessageResponseB2P { - required bool success = 1; - required int32 errCode = 2; - required string errMsg = 3; - optional bool requireAuth = 4; - optional int64 messageId = 5; - optional int64 appendTime = 6; - optional int64 appendOffset = 7; -} -``` - -**Data**是 Message 的二进制字节流: - -```java -private byte[] encodePayload(final Message message) { - final byte[] payload = message.getData(); - final String attribute = message.getAttribute(); - if (TStringUtils.isBlank(attribute)) { - return payload; - } - byte[] attrData = StringUtils.getBytesUtf8(attribute); - final ByteBuffer buffer = - ByteBuffer.allocate(4 + attrData.length + payload.length); - buffer.putInt(attrData.length); - buffer.put(attrData); - buffer.put(payload); - return buffer.array(); - } -``` - -**sentAddr**是 SDK 所在的本机 IPv4 地址转成 32 位的数字 ID; - -**msgType**是过滤的消息类型,msgTime 是 SDK 发消息时的消息时间,其值来源于构造 Message 时通过 putSystemHeader 填写的值,在 Message 里有对应的 API 获取; - -**requireAuth**:到 Broker 进行数据生产的要求认证操作,考虑性能问题,目前未生效,发送消息里填写的 authAuthorizedToken 值以 Master 侧提供的值为准,并且随 Master 侧改变而改变。 - -### 5.6 分区负载均衡过程: - ----------- - -Apache InLong TubeMQ 模块目前采用的是服务器端负载均衡模式,均衡过程由服务器管理维护;后续版本会增加客户端负载均衡模式,形成 2 种模式共存的情况,由业务根据需要选择不同的均衡方式。 - -**服务器端负载均衡过程如下**: - -- Master进程启动后,会启动负载均衡线程 balancerChore,balancerChore 定时检查当前已注册的消费组,进行负载均衡处理。过程简单来说就是将消费组订阅的分区均匀的分配给已注册的客户端,并定期检测客户端当前分区数是否超过预定的数量,如果超过则将多余的分区拆分给其他数量少的客户端。具体过程:首先 Master 检查当前消费组是否需要做负载均衡,如果需要,则将消费组订阅的 topic 集合的所有分区,以及这个消费组的所有消费者 ID 进行排序,然后按照消费组的所有分区数以及客户端个数进行整除及取模,获得每个客户端至多订阅的分区数;然后给每个客户端分配分区,并在消费者订阅时将分区信息在心跳响应里携带;如果客户端当前已有的分区有多,则给该客户端一条分区释放指令,将该分区从该消费者这里进行分区释放,同时给被分配的消费者一条分区分配的指令,告知分区分配给了对应客户端,具体指令如下: - -```protobuf -message EventProto{ - optional int64 rebalanceId = 1; - optional int32 opType = 2; - optional int32 status = 3; - /* consumerId@group-brokerId:host:port-topic:partitionId */ - repeated string subscribeInfo = 4; -} -``` - -**rebalanceId**:是一个自增 ID 的 long 数值,表示负载均衡的轮次; - -**opType**:为操作码,值在 EventType 中定义,目前已实现的操作码只有如下 4 个部分:释放连接,建立连接;only\_xxx 目前没有扩展开,收到心跳里携带的负载均衡信息后,Consumer 根据这个值做对应的业务操作; - -```java -switch (event.getType()) { - case DISCONNECT: - case ONLY_DISCONNECT: - disconnectFromBroker(event); - rebalanceResults.put(event); - break; - case CONNECT: - case ONLY_CONNECT: - connect2Broker(event); - rebalanceResults.put(event); - break; - case REPORT: - reportSubscribeInfo(); - break; - case STOPREBALANCE: - break; - default: - throw new TubeClientException(strBuffer - .append("Invalid rebalance opCode:") - .append(event.getType()).toString()); -} -``` - -**status**:表示该事件状态,在 EventStatus 里定义。Master 构造好负载均衡处理任务时设置指令时状态为 TODO;客户端心跳请求过来时,master 将该任务写到响应消息里,设置该指令状态为 PROCESSING;客户端从心跳响应里收到负载均衡指令,进行实际的连接或者断链操作,操作结束后,设置指令状态为 DONE,并等待下一次心跳请求发出时反馈给 Master; - -```java -public enum EventStatus { - /** - * To be processed state. - * */ - TODO(0, "To be processed"), - /** - * On processing state. - * */ - PROCESSING(1, "Being processed"), - /** - * Processed state. - * */ - DONE(2, "Process Done"), - - /** - * Unknown state. - * */ - UNKNOWN(-1, "Unknown event status"), - /** - * Failed state. - * */ - FAILED(-2, "Process failed"); -} -``` - -**subscribeInfo**表示分配的分区信息,格式如注释提示。 - - -- 消费端操作:消费端收到 Master 返回的元数据信息后,就进行连接建立和释放操作,见上面 opType 的注解,在连接建立好后,返回事件的处理结果给到 Master,从而完成相关的收到任务,执行任务,以及返回任务处理结果的操作;需要注意的是,负载均衡的注册是尽力而为的操作,如果消费端发起连接操作,但之前占用分区的消费者还没有来得及退出时,会收到 `PARTITION_OCCUPIED` 的错误响应,这个时候就将该分区从尝试队列删除;而之前分区消费者在收到对应响应后仍会做删除操作,从而下一轮的负载均衡时分配到这个分区的消费者成功注册到分区上。 - ---- -Back to top diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_broker_info.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_broker_info.png deleted file mode 100644 index 4747a884b14..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_broker_info.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_bytes_def.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_bytes_def.png deleted file mode 100644 index 45a238426ce..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_bytes_def.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_conn_detail.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_conn_detail.png deleted file mode 100644 index 6e803af5bd2..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_conn_detail.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_consumer_diagram.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_consumer_diagram.png deleted file mode 100644 index f761f54403a..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_consumer_diagram.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_convert_topicinfo.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_convert_topicinfo.png deleted file mode 100644 index 6c5bffac056..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_convert_topicinfo.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_event_proto.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_event_proto.png deleted file mode 100644 index 430d29737aa..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_event_proto.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_event_proto_optype.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_event_proto_optype.png deleted file mode 100644 index 9685b802526..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_event_proto_optype.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_event_proto_status.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_event_proto_status.png deleted file mode 100644 index 7a787cceb9c..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_event_proto_status.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_header_fill.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_header_fill.png deleted file mode 100644 index 0023e89abdb..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_header_fill.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_inner_structure.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_inner_structure.png deleted file mode 100644 index 9533ce46b30..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_inner_structure.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_master_authorizedinfo.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_master_authorizedinfo.png deleted file mode 100644 index 097fb0586cc..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_master_authorizedinfo.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_message_data.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_message_data.png deleted file mode 100644 index fa7a66ef952..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_message_data.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_pbmsg_structure.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_pbmsg_structure.png deleted file mode 100644 index 1ec4faf637d..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_pbmsg_structure.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_close2M.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_close2M.png deleted file mode 100644 index 5342d62182f..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_close2M.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_diagram.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_diagram.png deleted file mode 100644 index 9d087e77f8d..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_diagram.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_heartbeat2M.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_heartbeat2M.png deleted file mode 100644 index 3dc4367cbb4..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_heartbeat2M.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_register2M.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_register2M.png deleted file mode 100644 index 6add74c04f5..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_register2M.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_sendmsg2B.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_sendmsg2B.png deleted file mode 100644 index 2a81905e09b..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_producer_sendmsg2B.png and /dev/null differ diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_proto_def.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_proto_def.png deleted file mode 100644 index f56c2755db0..00000000000 Binary files a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/tubemq/img/client_rpc/rpc_proto_def.png and /dev/null differ