Skip to main content
Version: Next

Java SDK

Create real-time synchronization task

Create a task on the Dashboard or through the command line, and use Auto Push (autonomous push) as the data source type.

Import Java SDK

The library of the SDK need to be imported into the project before using the SDK. The library can be obtained in the following two ways:

  • Get the source code and compile it yourself and deploy the SDK package to the local warehouse, see How to Build.
  • Imported through maven dependency like this:
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>dataproxy-sdk</artifactId>
<version>${siteVariables.inLongVersion}</version>
</dependency>

Data report process

After import the SDK, you can instantiate a TcpMsgSender object, call sync(sendMessage()) or async(asyncSendMessage()) interface to report single or multiple(batch) data. see TcpClientExample.java. The overall process includes the following three steps:

Initialize SDK

From the demo code, we can see that the client initialization is mainly done in the getMessageSender() function:

public TcpMsgSender getMessageSender(MsgSenderFactory senderFactory, boolean visitMgrByHttps,
String managerAddr, String managerPort, String inlongGroupId, int msgType,
boolean useLocalMetaConfig, String configBasePath) {
TcpMsgSender messageSender = null;
try {
// build sender configure
// 'admin', 'inlong' is default username and password of InLong-Manager, which need to be replaced according to the environment configuration in actual use.
TcpMsgSenderConfig tcpConfig =
new TcpMsgSenderConfig(visitMgrByHttps, managerAddr,
Integer.parseInt(managerPort), inlongGroupId, "admin", "inlong");
// Set the local save path of the configuration. This setting is optional. By default, the SDK will create a "/.inlong/" directory under the current user's working directory to store the configuration.
tcpConfig.setMetaStoreBasePath(configBasePath);
// Set whether to use the local saved configuration or not. This setting is optional. By default, do not use.
tcpConfig.setOnlyUseLocalProxyConfig(useLocalMetaConfig);
// Set message type to send. This setting is optional. By default, send data in binary format.
tcpConfig.setSdkMsgType(MsgType.valueOf(msgType));
tcpConfig.setRequestTimeoutMs(20000L);
// build sender object
messageSender = senderFactory.genTcpSenderByClusterId(tcpConfig);
} catch (Throwable ex) {
System.out.println("Get MessageSender throw exception, " + ex);
}
return messageSender;
}

TcpMsgSenderConfig configuration

parameter nameParameter Descriptiondefault value
inlongGroupIdinlongGroupIdnot null
inlongStreamIdinlongStreamIdnot null
usernameusernamenot null
passwordpasswordnot null
visitMgrByHttpsrequest inlong manager protocolhttps: true , http: false
useLocalMetaConfigwhether to read DataProxy ip from localfalse

Call the send interface to report data

The SDK data send interface is thread safe, support send single or multiple messages by sync and async two ways. The following demo uses a single sync way to send, and the message does not contain property information:

    public void sendTcpMessage(TcpMsgSender sender,
String inlongGroupId, String inlongStreamId, long dt, String messageBody) {
ProcessResult procResult = new ProcessResult();
try {
// Sends a single message in sync mode, and does not contain property information
sender.sendMessage(new TcpEventInfo(inlongGroupId, inlongStreamId,
dt, null, messageBody.getBytes(StandardCharsets.UTF_8)), procResult);
} catch (Throwable ex) {
System.out.println("Message sent throw exception, " + ex);
return;
}
System.out.println("Message sent result = " + procResult);
}

You can also choose different send interfaces to report data according to your business needs. For the details of the interface, please refer to the definition in the TcpMsgSender interface file, which has a detailed introduction, no additional explanation here.

Close SDK

Since we create and reuse Sender objects through the Sender object factory, we close the data reporting service by calling the shutdownAll() function of the factory when exiting.

senderFactory.shutdownAll();

Warning

  • The MessageSender interface object is initialized based on the inlongGroupId, so each MessageSender object can be used differently based on the inlongGroupId, and multiple MessageSender objects can be created in the same process.
  • The SDK provides three different network interaction ways: TCP, HTTP. Examples of these three ways are given in the example (refer to TcpClientExample.java, HttpClientExample.java, UdpClientExample.java), and the business can be customized according to its own needs to initialize different MessageSender object.
  • The SDK contains complex network interactions, MessageSender should be used as a resident object. Avoid frequent initialization and shutdown of MessageSender (frequent initialization and shutdown will have a large resource overhead and will affect the timeliness of data reporting).
  • The SDK does not resend the failed message. When using the SDK to report data, if send fails, you need to decide whether to resend according to your own needs.

Error Code Introduction

Common error codes are as follows.

CodeExplainRemarks
ErrorCode.OKSuccessfully sent
ErrorCode.SDK_CLOSEDSDK has closed
ErrorCode.FETCH_PROXY_META_FAILURESDK failed to obtain metadata
ErrorCode.EMPTY_ACTIVE_NODE_SETNo active nodes available
ErrorCode.EMPTY_WRITABLE_NODE_SETAll nodes are not writable
ErrorCode.NO_VALID_REMOTE_NODENo available connectionIn this case, it is recommended to increase the number of available connections.
ErrorCode.REPORT_INFO_EXCEED_MAX_LENThe reported data exceeds the maximum allowed length
ErrorCode.CONNECTION_UNAVAILABLEConnection unavailable
ErrorCode.CONNECTION_BREAKConnection is breaked
ErrorCode.CONNECTION_UNWRITABLEConnection not writableThis is usually caused by the front-end producing data faster than the server's response speed. It is recommended to sleep appropriately when sending to avoid blocking.
ErrorCode.CONNECTION_WRITE_EXCEPTIONWrite report request process exception
ErrorCode.SEND_WAIT_INTERRUPTInterrupt
ErrorCode.SEND_WAIT_TIMEOUTRequest response timeout
ErrorCode.SEND_ON_EXCEPTIONSend request exception
ErrorCode.UNKOWN_ERRORUnknown error

TcpMsgSenderConfig Configuration Introduction

ParameterExplainAdjustment Suggestion
setAliveConnections(int aliveConnections)Set the number of DataProxy connections. Default: 7.1) If the amount of data is large or sensitive to delay, increase this parameter appropriately; 2) According to the size of the DataProxy cluster, adjust this parameter appropriately. For example, if the cluster size is 30, this value can be set to 5 ~ 10; 3) Experience value 15 ~ 20.
setSendBufferSize(int sendBufferSize)Set the size of SDK internal buffer queue during async send. The buffer is used to store packets that have been sent but have not received an Ack from the dataProxy. When the buffer reaches this threshold, continue to send data, and will receive an ErrorCode.CONNECTION_UNWRITABLE exception. Default: 16 1024 1024 Bytes.1) Normally, there is no need to adjust this parameter; 2) When the amount of data is very large or the load of DataProxy is high, it can be increased appropriately. Be careful not to be too large, which may cause OOM.
setConnectTimeoutMs(int connectTimeoutMs)Set the connection timeout interval. Unit: ms, Default: 8000.Set according to the actual environment.
setRequestTimeoutMs(long requestTimeoutMs)Set request timeout interval. Unit: ms, Default: 10000.Adjust settings as needed.
setMaxAllowedSyncMsgTimeoutCnt(int maxAllowedSyncMsgTimeoutCnt)Set the number of timeout disconnections of a single DataProxy connection. The SDK will internally count the DataProxy connections that have timed out and have not received an Ack. If the timeout times of a connection reach the value within a short period of time, the SDK automatically disconnects the connection and selects another DataProxy to create a new connection for data reporting. Default value: 10.If the size of the DataProxy cluster is small, you can appropriately increase this parameter to avoid frequent disconnection in a short time.
setMgrConnTimeoutMs(int mgrConnTimeoutMs)Set the timeout interval for SDK connection to InLong Manager. Unit: ms, Default: 8000.1) When the network environment is not good, the value can be increased appropriately; 2) When the client takes a long time to resolve the domain name, the value can be increased appropriately.
setMgrSocketTimeoutMs(int mgrSocketTimeoutMs)Sets the timeout for the SDK to get the DataProxy list from the InLong Manager connection, Unit: ms, Default: 15000.When the network environment is not good, the value can be increased appropriately.