Skip to main content
Version: 1.8.0

Golang SDK

Create real-time synchronization task

Create a task on the Dashboard or with the command line tool, and set Auto Push (autonomous push) as the data source type.

Import Golang SDK

To use the Golang SDK, you need to import it into your projects. Import the Golang SDK:

import "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy"

Data report process

After importing the SDK, you can initialize a Client instance, and then call Send() or SendAsync() to produce messages to DataProxy, SDK will put the messages with a same StreamID into a batch and send them together. A demo can be found at: example_test.go. Basically, there are 3 steps to produce messages:

Initialize the SDK

client, err := dataproxy.NewClient(
dataproxy.WithGroupID("test"),
dataproxy.WithURL("http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList"),
dataproxy.WithMetricsName("test"),
)

if err != nil {
fmt.Println(err)
return
}

Parameters:

  • dataproxy.WithGroupID("test") sets the GroupID;
  • dataproxy.WithURL("http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList") sets the Manager URL;
  • dataproxy.WithMetricsName("test") sets the value of the metrics label: "name" of the Client.

Call the Send/SendAsync method to send

The send methods of the SDK are goroutine safe, you can send a message synchronously or asynchronously, there are both synchronous and asynchronous examples in the demo.

Send synchronously :

for i := 0; i < 1000; i++ {
err := client.Send(context.Background(), dataproxy.Message{
GroupID: "test",
StreamID: "test",
Payload: []byte("test|a|b|c"),
})
if err !=nil {
fmt.Println(err)
}
}

Send asynchronously:

var success atomic.Uint64
var failed atomic.Uint64
for i := 0; i < 1000; i++ {
client.SendAsync(context.Background(),
dataproxy.Message{
GroupID: "test",
StreamID: "test",
Payload: []byte("test|a|b|c"),
},
func(msg dataproxy.Message, err error) {
if err != nil {
success.Add(1)
} else {
failed.Add(1)
}
})
}

// wait async send finish
time.Sleep(3 * time.Second)
fmt.Println("success:", success.Load())
fmt.Println("failed:", failed.Load())

The asynchronous way is recommended, as the synchronous way has no concurrency, messages can be sent after the previous one is done or timeout, it can't fulfill your requirements when you need high throughput.

Close the SDK

Closing the SDK can be done simply by calling the Close() method of the Client:

client.Close()

Notes

  • GroupID and URL are mandatory options when you initialize the SDK, you can call dataproxy.WithGroupID() and dataproxy.WithURL() to set them;
  • When you initialize more the one instance of Client, the MetricsName must be set to different values, or it will be failed when pulling metrics, you can call dataproxy.WithMetricsName() to set it;
  • The default values of the config options of the SDK are good enough for production, you don't need the change then unless you really need to do that, the default values are described in a section below;
  • The Send() method of the SDK has no concurrent, messages are sent one by one, it is NOT recommended;
  • The SDK will retry 2 times each message, if it still failed finally, it is up to you to decide what to do next.

Errors

Some common errors:

ErrorDesc
errors.New("URL is not given")Manager URL is not set.
errors.New("group ID is not given")Group ID is not set.
errors.New("invalid URL")Manager URL is invalid.
errors.New("invalid group ID")Group ID is invalid.
errors.New("service has no endpoints")Service has no endpoints, service discoery failed.
errors.New("no available worker")No available workers, workers are busy.
errNo{code: 10001, strCode: "10001", message: "message send timeout"}Send timeout.
errNo{code: 10002, strCode: "10002", message: "message send failed"}Send failed.
errNo{code: 10003, strCode: "10003", message: "producer already been closed"}Producer is closed.
errNo{code: 10004, strCode: "10004", message: "producer send queue is full"}Send queue is full, return immediately.
errNo{code: 10005, strCode: "10005", message: "message context expired"}Send queue is full, enqueue timeout.
errNo{code: 10010, strCode: "10010", message: "input log is invalid"}Input message is invalid, the payload may be empty.

Config options

OptionDefault valueDescOptional
WithGroupID()""Sets the Group ID.No
WithURL()""Sets the Manager URL.No
WithUpdateInterval()5mSets the update interval of service discoery.Yes
WithConnTimeout()3000msSets the connection timeout.Yes
WriteBufferSize16MSets the write buffer size.Yes
WithReadBufferSize16MSets the read buffer size.Yes
WithSocketSendBufferSize16MSets the socket send buffer size.Yes
WithSocketRecvBufferSize16MSets the socket receive buffer size.Yes
WithBufferPoolnilSets the buffer pool to use.Yes, if the application has one, use the same one is recommended.
WithBytePoolnilSets the byte pool to use.Yes, if the application has one, use the same one is recommended.
WithBufferPoolSize409600Sets the buffer pool size.Yes
WithBytePoolSize409600Sets the byte pool size.Yes
WithBytePoolWidthequals to BatchingMaxSizeSets the byte pool width.Yes
WithLoggerstd.outSets the debug logger.Yes, but the default one is not recommended, as it has no log levels control.
WithMetricsName"dataproxy-go"Sets the metrics name of this client.Yes, if there are more than one client instance in one application, the metrics names must be different.
WithMetricsRegistryprometheus.DefaultRegistererSets the metrics registry.Yes
WithWorkerNum8Sets the worker number.Yes
WithSendTimeout30000msSets the send timeout.Yes
WithMaxRetries2Sets the max retry count.Yes
WithBatchingMaxPublishDelay10msSets how long a message to wait for batching.Yes
WithBatchingMaxMessages10Sets the message number of a batch.Yes
WithBatchingMaxSize4KSets the batch size in Bytes of a batch.Yes
WithMaxPendingMessages409600Sets the queue length of each worker.Yes
WithBlockIfQueueIsFullfalseSets if block if the queue is full.Yes
WithAddColumnsnilSets the added columns to all the messages, DataProxy supports add addtional columns at specific positions to all messages,for example:__addcol1__worldid=xxx will add a column named 'worldid' at position 1 and its value is 'xxx'.Yes

options refers to options.go

Metrics

NameTypeLabelsDesc
data_proxy_error_countcountername: name we set with WithMetricsName()
code: error code
count the errors and the error code.
data_proxy_retry_countcountername: name we set with WithMetricsName()
worker: worker index
count the retry messages and the worker's index.
data_proxy_timeout_countcountername: name we set with WithMetricsName()
worker: worker index
count the timeout messages and the worker's index.
data_proxy_msg_countcountername: name we set with WithMetricsName()
code: error code
count the message handled and error code.
data_proxy_update_conn_countcountername: name we set with WithMetricsName()
code: error code
count the connection update/switch times and error code. when error code is '0', it is a normal update, otherwise, it may be network error.
data_proxy_pending_msg_gaugegaugename: name we set with WithMetricsName()
worker: error code
count the pending message in each worker and the worker index.
data_proxy_batch_sizehistogramname: name we set with WithMetricsName()
code: error code
distribution of the batch size and the error code.
data_proxy_batch_timehistogramname: name we set with WithMetricsName()
code: error code
distribution of the batch send time duration and the error code.

metrics refers to metrics.go

error codes refers to worker.go