TubeMQ binary protocol
Overview
The various nodes (Client, Master, Broker) of the InLong TubeMQ module interact with each other in the form of TCP long connections, and use a custom binary encoding protocol to construct interactive request and response messages. This article mainly introduces the definition of the binary protocol and gives an example of how to complete the entire process of TubeMQ production and consumption interaction through this protocol.
TubeMQ message format
The following figure is a schematic diagram of the TubeMQ message format definition:

As shown from the figure above, each interactive message consists of three fixed parts:
- MsgToken: this field is used to identify the legitimacy of the TubeMQ message. Each TubeMQ message will carry the specified - RPC_PROTOCOL_BEGIN_TOKENparameter value. When the client receives a message that does not start with this field, it means that the message is not a legitimate message sent by TubeMQ. The connection can be closed according to the policy, prompting an error exit or reconnection;
- SerialNo: the message sequence number is generated by the requester and returned by the recipient of the request in the response message as is, so that the recipient of the response can associate the request corresponding to the response message lock; 
- Message content part: this part is encoded by Protobuf and consists of several parts: - ListSize: 4 bytes, indicating the total number of data blocks after the data encoded by Protobuf is cut into a certain length. This field is not 0 under the current protocol definition; 
- [<Length><Data>]: data block, composed of 2 fields, indicating the length of the data block sent and the data content, among which:- Length: identifies the length of the data block 
- Data: identifies the binary data content of the data block 
 
 
Why is the Protobuf (hereinafter referred to as PB) encoded data content defined in the form of ListSize [<Length><Data>]?
The main reason is that in the initial implementation of TubeMQ, the serialized PB data is stored in ByteBuffer objects. The maximum block length of a single ByteBuffer in Java is 8196 bytes. PB message content exceeding the length of a single block is stored in multiple ByteBuffers; and when the data is serialized to the TCP message, the total length is not counted, and the ByteBuffer list serialized in PB is directly written into the message. When implementing in multiple languages, this needs special attention: the PB data content needs to be serialized into a block array (there is corresponding support in the PB codec).
The PB codec file of the message content is stored in the org.apache.inlong.tubemq.corerpc module. For detailed format definitions, refer to the relevant files.
PB format encoding
The PB protocol is divided into three parts:
- RPC framework definition: - RPC.proto
- Master-related message encoding: - MasterService.proto
- Broker-related message encoding: - BrokerService.proto
These protocol definition files are directly compiled through PB to obtain the corresponding implementation class. Taking RPC.proto as an example, RPC.proto defines 6 structures, which are divided into 2 types:
- Request message 
- Response message, including normal response return and response return in case of exception 
The request message encoding and response message decoding can be implemented by referring to the NettyClient.java class. There is some room for improvement in the definition of this part, see TUBEMQ-109 for details. However, due to compatibility considerations, it will be gradually replaced. According to the current proto version, interaction is not a problem at least before version 1.0.0, but the new protocol will be considered for 1.0.0. The protocol implementation module requires each SDK to reserve room for improvement.
Taking the request message filling as an example, the RpcConnHeader and other related structures are as follows:
message RpcConnHeader {
    required int32 flag = 1;
    optional int64 traceId = 2;
    optional int64 spanId = 3;
    optional int64 parentId = 4;
}
message RequestHeader {
    optional int32 serviceType = 1;
    optional int32 protocolVer = 2;
}
message RequestBody {
    required int32 method = 1;
    optional int64 timeout = 2;
    optional bytes request = 3;
}
Among them:
- RpcConnHeader's- flagmarks whether the message is requested, and the following three fields mark the relevant content of message tracking, which is not used at present;
- RequestHeadercontains information about service type and protocol version;
- RequestBodycontains request method, timeout, and request content, among which- timeoutis the maximum allowed waiting time from when a request is received by the server to when it is actually processed. If it exceeds, it will be discarded. The current default is 10 seconds.
The specific implementation of request filling is shown in the following part:
RequestWrapper requestWrapper =
                new RequestWrapper(PbEnDecoder.getServiceIdByServiceName(targetInterface),
                        RpcProtocol.RPC_PROTOCOL_VERSION,
                        RpcConstants.RPC_FLAG_MSG_TYPE_REQUEST,
                        requestTimeout); // request timeout
At this point, the introduction to the protocol format definition of TubeMQ is complete. Next, we will complete data production and consumption with messages composed of these protocol formats.
Client request response interaction
Producer production interaction diagram
The Producer uses a total of 4 pairs of instructions: registering with the Master node, maintaining heartbeats, and exiting registration operations; interacting with the Broker node to report messages:

From here we can see that the Producer obtains metadata information such as the partition list corresponding to the specified Topic from the Master. After obtaining this information, it selects the partition according to the client's rules and sends the message to the corresponding Broker.
Producer needs to pay attention to multi-language implementation:
- The Master has active and standby nodes, and only the active node can provide services. When the Producer connects to the standby node, it will receive a - StandbyExceptionexception response. At this time, the client needs to select other Master nodes for registration, and finally select the active Master node for registration;
- When the Master connection fails during the production process, such as timeout, passive disconnection of the link, etc., the Producer must initiate a re-registration request to the Master; 
- After receiving the metadata information of the Topic from the Master, the Producer must pre-connect to the Broker in advance to avoid a sudden increase in connection requests during data production that affects the message reporting performance; 
- The connection between the Producer and the Broker must be detected for anomalies: Broker failure nodes must be detected in long-term running scenarios, and links that have not sent messages for a long time must be recycled to improve the stability of the data reporting service. 
Consumer consumption interaction diagram
Consumer uses a total of 8 pairs of instructions: registering with the Master, heartbeat, and deregistering; registering with the Broker, deregistering, heartbeat, pulling messages, and confirming messages; the registration and deregistration with the Broker use the same command name with different status codes to identify and distinguish different operations:

From the example in the figure above, we can see that:
- When the Consumer registers with the main Master node, the Master does not return metadata information to the Consumer, but returns it in the subsequent heartbeat link. The reason is that the Consumer in the example uses the server-side load balancing mode, and needs to wait for the server to distribute the consumption partition information before obtaining the corresponding consumption partition; 
- There are registration and un-registration operations from Consumer to Broker. The reason is that the partition is exclusive consumption during consumption, that is, the same partition can only be consumed by one consumer in the same group at the same time. The client obtains the consumption rights of the partition through the registration operation; 
- Consumer message pulling and consumption confirmation need to appear in pairs. Through the secondary confirmation of data consumption, the problem of repeated consumption can be minimized as much as possible, and the problem of data being missed in abnormal situations can be solved. 
The RPC interaction process between the client and the server
As shown below:

- When the client interacts with the TubeMQ server, it must maintain local storage of the sent request message until the RPC times out or a response message is received; 
- The client associates the SerialNo value carried in the response message with the previously cached sent request record; 
- After receiving the Broker and Topic metadata information from the Master, the client must save it locally and update it with the latest metadata, and report the cached metadata to the Master regularly; 
- The client must maintain the heartbeat of the Master or Broker. If the Master reports a registration timeout error, it must re-register; 
- The client must establish a connection based on the Broker, and the business is allowed to choose to establish a connection by object or by process between different objects in the same process. 
Producer registers with the Master
message RegisterRequestP2M {
    required string clientId = 1;
    repeated string topicList = 2;
    required int64 brokerCheckSum = 3;
    required string hostName = 4;
    optional MasterCertificateInfo authInfo = 5;
    optional string jdkVersion = 6;
    optional ApprovedClientConfig appdConfig = 7;
}
message RegisterResponseM2P {
    required bool success = 1;
    required int32 errCode = 2;
    required string errMsg = 3;
    required int64 brokerCheckSum = 4;
    repeated string brokerInfos = 5;
    optional MasterAuthorizedInfo authorizedInfo = 6;
    optional ApprovedClientConfig appdConfig = 7;
}
- clientId:Identifies the Producer object. The ID value is constructed when the Producer is started and is valid during the Producer life cycle. The current construction rules of the Java version of the SDK are: - ClientId = consumerGroup + "_"
 + AddressUtils.getLocalAddress() + "_" // local ip (IPV4)
 + pid + "_" // processId
 + timestamp + "_" // timestamp
 + counter + "_" // increament counter
 + consumerType + "_" // type of consumer,including Pull and Push
 + clientVersion; // version for client- It is recommended that other languages add the above mark to facilitate troubleshooting; 
- topicList: Identifies the topic list published by the user. The Producer will provide the initial topic list of the data to be published during initialization. During operation, the business is also allowed to delay adding new topics and reducing published topics through the Publish function; 
- brokerCheckSum: The check value of the Broker metadata information saved locally by the client. The Producer does not have this data locally during initial startup, so the value is -1; the SDK needs to carry the last brokerCheckSum value in each request, and the Master determines whether the client's metadata needs to be updated by comparing this value; 
- hostname: The IPV4 address value of the machine where the Producer is located; 
- success: Whether the operation is successful, success is true, and failure is false; 
- errCode: Error code, combined with errMsg information to determine the specific cause of the error; 
- errMsg: Error message, if the request response fails, the SDK needs to print out the specific error message 
- authInfo: authentication and authorization information. If the user configuration has filled in the "Start authentication process", fill it in; if authentication is required, report it according to the signature of the username and password. If it is running, such as during heartbeat, if the Master forces authentication, report it according to the signature of the username and password. If not, authenticate it according to the authorization token provided by the Master during the previous interaction; the authorization token is also used to carry the message production to the Broker during production. 
- brokerInfos: Broker metadata information. This field mainly contains the Broker information list of the entire cluster fed back by the Master to the Producer; its format is as follows: - public BrokerInfo(String strBrokerInfo, int brokerPort) {
 String[] strBrokers =
 strBrokerInfo.split(TokenConstants.ATTR_SEP);
 this.brokerId = Integer.parseInt(strBrokers[0]);
 this.host = strBrokers[1];
 this.port = brokerPort;
 if (!TStringUtils.isBlank(strBrokers[2])) {
 this.port = Integer.parseInt(strBrokers[2]);
 }
 this.buildStrInfo();
 }
- authorizedInfo:Master provides authorization information in the following format: - message MasterAuthorizedInfo {
 required int64 visitAuthorizedToken = 1;
 optional string authAuthorizedToken = 2;
 }
- visitAuthorizedToken: Access authorization token, to prevent the client from bypassing the Master to access the Broker node. The SDK needs to save this information locally and carry this information when accessing the Broker in the future. If this field changes in the subsequent heartbeat, the locally cached data of this field needs to be updated; 
- authAuthorizedToken: Authorization token that has passed authentication. If there is data in this field, the SDK needs to save it and carry this field information when accessing the Master and Broker in the future. If this field changes in the subsequent heartbeat, the locally cached data of this field needs to be updated. 
Producer to Master Heartbeat
message HeartRequestP2M {
    required string clientId = 1;
    required int64 brokerCheckSum = 2;
    required string hostName = 3;
    repeated string topicList = 4;
    optional MasterCertificateInfo authInfo = 5;
    optional ApprovedClientConfig appdConfig = 6;
}
message HeartResponseM2P {
    required bool success = 1;
    required int32 errCode = 2;
    required string errMsg = 3;
    required int64 brokerCheckSum = 4;
    /* brokerId:host:port-topic:partitionNum */
    repeated string topicInfos = 5;
    repeated string brokerInfos = 6;
    optional bool requireAuth = 7;
    optional MasterAuthorizedInfo authorizedInfo = 8;
    optional ApprovedClientConfig appdConfig = 9;
}
- topicInfos: Topic metadata information published by the SDK, including partition information and the Broker node where it is located. The specific decoding method is as follows: - public static Tuple2<Map<String, Integer>, List<TopicInfo>> convertTopicInfo(
 Map<Integer, BrokerInfo> brokerInfoMap, List<String> strTopicInfos) {
 List<TopicInfo> topicList = new ArrayList<>();
 Map<String, Integer> topicMaxSizeInBMap = new ConcurrentHashMap<>();
 if (strTopicInfos == null || strTopicInfos.isEmpty()) {
 return new Tuple2<>(topicMaxSizeInBMap, topicList);
 }
 String[] strInfo;
 String[] strTopicInfoSet;
 String[] strTopicInfo;
 BrokerInfo brokerInfo;
 for (String info : strTopicInfos) {
 if (info == null || info.isEmpty()) {
 continue;
 }
 info = info.trim();
 strInfo = info.split(TokenConstants.SEGMENT_SEP, -1);
 strTopicInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP);
 for (String s : strTopicInfoSet) {
 strTopicInfo = s.split(TokenConstants.ATTR_SEP);
 brokerInfo = brokerInfoMap.get(Integer.parseInt(strTopicInfo[0]));
 if (brokerInfo != null) {
 topicList.add(new TopicInfo(brokerInfo,
 strInfo[0], Integer.parseInt(strTopicInfo[1]),
 Integer.parseInt(strTopicInfo[2]), true, true));
 }
 }
 if (strInfo.length == 2 || TStringUtils.isEmpty(strInfo[2])) {
 continue;
 }
 try {
 topicMaxSizeInBMap.put(strInfo[0], Integer.parseInt(strInfo[2]));
 } catch (Throwable e) {
 //
 }
 }
 return new Tuple2<>(topicMaxSizeInBMap, topicList);
 }
- requireAuth: indicates that the previous authorized access code (authAuthorizedToken) of the Master has expired, requiring the SDK to carry the signature information of the username and password for authentication in the next request; 
Producer to Master Close and Exit
message CloseRequestP2M{
    required string clientId = 1;
    optional MasterCertificateInfo authInfo = 2;
}
message CloseResponseM2P{
    required bool success = 1;
    required int32 errCode = 2;
    required string errMsg = 3;
}
Noted that if authentication is turned on, authentication will be done when it is turned off to avoid external interference.
Producer sends messages to Broker
The content of this section is mainly related to the definition of Message:
message SendMessageRequestP2B {
    required string clientId = 1;
    required string topicName = 2;
    required int32 partitionId = 3;
    required bytes data = 4;
    required int32 flag = 5;
    required int32 checkSum = 6;
    required int32 sentAddr = 7;
    optional string msgType = 8;
    optional string msgTime = 9;
    optional AuthorizedInfo authInfo = 10;
}
message SendMessageResponseB2P {
    required bool success = 1;
    required int32 errCode = 2;
    required string errMsg = 3;
    optional bool requireAuth = 4;
    optional int64 messageId = 5;
    optional int64 appendTime = 6;
    optional int64 appendOffset = 7;
}
- data: Binary byte stream information of Message, implemented as follows: - private byte[] encodePayload(final Message message) {
 final byte[] payload = message.getData();
 final String attribute = message.getAttribute();
 if (TStringUtils.isBlank(attribute)) {
 return payload;
 }
 byte[] attrData = StringUtils.getBytesUtf8(attribute);
 final ByteBuffer buffer =
 ByteBuffer.allocate(4 + attrData.length + payload.length);
 buffer.putInt(attrData.length);
 buffer.put(attrData);
 buffer.put(payload);
 return buffer.array();
 }
- sentAddr: IPv4 of the local machine where the SDK is located. Here, the IP address is converted into a 32-bit digital ID; 
- msgType: The stream value to which the message belongs, used for filtering consumption; 
- msgTime The time when the SDK sends a message. Its value comes from the value filled in by putSystemHeader when constructing the Message; 
- requireAuth: Whether authentication identification is required for data production to the Broker. Considering performance issues, it is not effective at present. The authAuthorizedToken value filled in the sent message is based on the value provided by the Master side and changes with the Master side. 
Partition Loadbalance
The InLong TubeMQ module currently supports two balancing modes: server-side load balancing and client-side balancing. The business can choose different balancing methods according to needs.
The server balancing process is managed and maintained by the server, and the requirements for the Consumer consumption side are relatively low. The load balancing process is as follows:
- After the Master process is started, the load balancing thread balancerChore is started. BalancerChore periodically checks the currently registered consumer groups and performs load balancing. In simple terms, the process is to evenly distribute the partitions subscribed by the consumer group to the registered clients, and regularly check whether the current number of partitions of the client exceeds the predetermined number. If it exceeds, the excess partitions are split to other clients with a smaller number. 
- The Master checks whether the current consumer group needs to be load balanced. If necessary, all partitions of the Topic set subscribed by the consumer group and all consumer IDs of this consumer group are sorted, and then the number of partitions and the number of clients of the consumer group are divided and modulo to obtain the maximum number of partitions subscribed by each client; then partitions are allocated to each client, and the partition information is carried in the heartbeat response when the consumer subscribes; if the client currently has more partitions, a partition release instruction is given to the client to release the partition from the consumer, and a partition allocation instruction is given to the allocated consumer to inform the partition that the corresponding client is allocated. The specific instructions are as follows: - message EventProto{
 optional int64 rebalanceId = 1;
 optional int32 opType = 2;
 optional int32 status = 3;
 /* consumerId@group-brokerId:host:port-topic:partitionId */
 repeated string subscribeInfo = 4;
 }- Among them: - rebalanceId: self-incrementing long value ID, indicating the round of load balancing; 
- subscribeInfo: indicates the assigned partition information; 
- opType: operation code, the value is defined in EventType, and the currently implemented operation codes only have the following 4 parts: release connection, establish connection; only_xxx is not expanded at present. After receiving the load balancing information carried in the heartbeat, the Consumer performs corresponding business operations according to this value; - switch (event.getType()) {
 case DISCONNECT:
 case ONLY_DISCONNECT:
 disconnectFromBroker(event);
 rebalanceResults.put(event);
 break;
 case CONNECT:
 case ONLY_CONNECT:
 connect2Broker(event);
 rebalanceResults.put(event);
 break;
 case REPORT:
 reportSubscribeInfo();
 break;
 case STOPREBALANCE:
 break;
 default:
 throw new TubeClientException(strBuffer
 .append("Invalid rebalance opCode:")
 .append(event.getType()).toString());
 }
- status: indicates the status of the event, defined in - EventStatus:- public enum EventStatus {
 /**
 * To be processed state.
 * */
 TODO(0, "To be processed"),
 /**
 * On processing state.
 * */
 PROCESSING(1, "Being processed"),
 /**
 * Processed state.
 * */
 DONE(2, "Process Done"),
 /**
 * Unknown state.
 * */
 UNKNOWN(-1, "Unknown event status"),
 /**
 * Failed state.
 * */
 FAILED(-2, "Process failed");
 }
 
- When the Master constructs the load balancing processing task, it sets the instruction status to TODO; when the client heartbeat request comes, the Master writes the task into the response message and sets the instruction status to PROCESSING; the client receives the load balancing instruction from the heartbeat response, performs the actual connection or disconnection operation, and after the operation is completed, sets the instruction status to DONE, and waits for the next heartbeat request to be sent back to the Master; 
- Consumer operation: After the Consumer receives the metadata information returned by the Master, it establishes and releases the connection, see the opType annotation above, and after the connection is established, returns the event processing result to the Master, thereby completing the related operations of receiving tasks, executing tasks, and returning task processing results; it should be noted that load balancing registration is a best-effort operation. If the consumer initiates a connection operation, but the consumer that previously occupied the partition has not had time to exit, it will receive - PARTITION_OCCUPIEDThe partition is deleted from the attempt queue at this time; the previous partition consumer will still perform the deletion operation after receiving the corresponding response, so that the consumer assigned to this partition in the next round of load balancing is successfully registered on the partition.
At this point, the consumption balancing operation on the consumer side is completed, and the consumer registers and consumes data after obtaining the partition information.