Pulsar To Hive Example
Prepare To Get Module Archive
Module archive is in the directory:inlong-sort-standalone/sort-standalone-dist/target/, the archive file is apache-inlong-sort-standalone-${project.version}-bin.tar.gz.
Prepare To Modify Configuration File
At first, decompress the archive file, copy three files in the directory conf/hive/ to the directory conf/.
- conf/common.properties, common configuration of all components.
- conf/SortClusterConfig.conf, sink configuration of all sort tasks.
- conf/sid_hive_inlong6th_v3.conf, data source configuration example of a sort task, the file name is same with sort task name in SortClusterConfig.conf.
Example: conf/common.properties
# inlong-sort-standalone cluster id
clusterId=hivev3-sz-sz1
# Current node ID
nodeId=nodeId
# Domain name of metric
metricDomains=Sort
# Class name list of metric listener, separated by space
metricDomains.Sort.domainListeners=org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener
# Interval snapshoting metric data(millisecond)
metricDomains.Sort.snapshotInterval=60000
# Channel class name
sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
# Sink class name. Different distribution types use different Sink classes
sortSink.type=org.apache.inlong.sort.standalone.sink.hive.HiveSink
# Source class name
sortSource.type=org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource
# There are three ways to load cluster configuration data: [file, Manager, custom class].
sortClusterConfig.type=file
# When the cluster configuration data is loaded from a file, the name of the configuration file in the classpath
sortClusterConfig.file=SortClusterConfig.conf
# There are three ways to load the Sort task configuration data: [file, Manager, custom class]
sortSourceConfig.QueryConsumeConfigType=file
Example: conf/SortClusterConfig.conf
{
"data": {
"clusterName": "hivev3-sz-sz1",
"sortTasks": [
{
"idParams": [
{
"inlongGroupId": "0fc00000046",
"inlongStreamId": "",
"separator": "|",
"partitionIntervalMs": 3600000,
"idRootPath": "/user/hive/warehouse/t_inlong_v1_0fc00000046",
"partitionSubPath": "/{yyyyMMdd}/{yyyyMMddHH}",
"hiveTableName": "t_inlong_v1_0fc00000046",
"partitionFieldName": "dt",
"partitionFieldPattern": "yyyyMMddHH",
"msgTimeFieldPattern": "yyyy-MM-dd HH:mm:ss",
"maxPartitionOpenDelayHour": 8
},
{
"inlongGroupId": "03600000045",
"inlongStreamId": "",
"separator": "|",
"partitionIntervalMs": 3600000,
"idRootPath": "/user/hive/warehouse/t_inlong_v1_03600000045",
"partitionSubPath": "/{yyyyMMdd}/{yyyyMMddHH}",
"hiveTableName": "t_inlong_v1_03600000045",
"partitionFieldName": "dt",
"partitionFieldPattern": "yyyyMMddHH",
"msgTimeFieldPattern": "yyyy-MM-dd HH:mm:ss",
"maxPartitionOpenDelayHour": 8
},
{
"inlongGroupId": "05100054990",
"inlongStreamId": "",
"separator": "|",
"partitionIntervalMs": 3600000,
"idRootPath": "/user/hive/warehouse/t_inlong_v1_05100054990",
"partitionSubPath": "/{yyyyMMdd}/{yyyyMMddHH}",
"hiveTableName": "t_inlong_v1_05100054990",
"partitionFieldName": "dt",
"partitionFieldPattern": "yyyyMMddHH",
"msgTimeFieldPattern": "yyyy-MM-dd HH:mm:ss",
"maxPartitionOpenDelayHour": 8
},
{
"inlongGroupId": "09c00014434",
"inlongStreamId": "",
"separator": "|",
"partitionIntervalMs": 3600000,
"idRootPath": "/user/hive/warehouse/t_inlong_v1_09c00014434",
"partitionSubPath": "/{yyyyMMdd}/{yyyyMMddHH}",
"hiveTableName": "t_inlong_v1_09c00014434",
"partitionFieldName": "dt",
"partitionFieldPattern": "yyyyMMddHH",
"msgTimeFieldPattern": "yyyy-MM-dd HH:mm:ss",
"maxPartitionOpenDelayHour": 8
},
{
"inlongGroupId": "0c900035509",
"inlongStreamId": "",
"separator": "|",
"partitionIntervalMs": 3600000,
"idRootPath": "/user/hive/warehouse/t_inlong_v1_0c900035509",
"partitionSubPath": "/{yyyyMMdd}/{yyyyMMddHH}",
"hiveTableName": "t_inlong_v1_0c900035509",
"partitionFieldName": "dt",
"partitionFieldPattern": "yyyyMMddHH",
"msgTimeFieldPattern": "yyyy-MM-dd HH:mm:ss",
"maxPartitionOpenDelayHour": 8
}
],
"name": "sid_hive_inlong6th_v3",
"sinkParams": {
"hdfsPath": "hdfs://127.0.0.1:9000",
"maxFileOpenDelayMinute": "5",
"tokenOvertimeMinute": "60",
"maxOutputFileSizeGb": "2",
"hiveJdbcUrl": "jdbc:hive2://127.0.0.2:10000",
"hiveDatabase": "default",
"hiveUsername": "hive",
"hivePassword": "hive"
},
"type": "HIVE"
}
]
},
"errCode": 0,
"md5": "md5",
"result": true
}
Example: conf/sid_hive_inlong6th_v3.conf
{
"sortClusterName": "hivev3-sz-sz1",
"sortTaskId": "sid_hive_inlong6th_v3",
"cacheZones": {
"pc_inlong6th_sz1": {
"zoneName": "${PULSAR_CLUSTER_NAME}",
"serviceUrl": "http://${PULSAR_IP}:${PULSAR_PORT}",
"authentication": "${PULSAR_AUTH}",
"topics": [
{
"topic": "${TENANT/NAMESPACE/TOPIC}",
"partitionCnt": 10,
"topicProperties": {}
}
],
"cacheZoneProperties": {},
"zoneType": "pulsar"
}
}
}
Configuration For conf/common.properties
| Parameter | Required | DefaultValue | Remark |
|---|---|---|---|
| clusterId | Y | NA | inlong-sort-standalone collection unique identification |
| nodeId | N | Localhost IP | Current node ID |
| metricDomains | N | Sort | Index summary name |
| metricDomains.Sort.domainListeners | N | org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener | List of indexes and list of equipment categories, empty case interval |
| metricDomains.Sort.snapshotInterval | N | 60000 | The retry timeout for subscribing to a tube, in ms |
| prometheusHttpPort | N | 8080 | Parameters of org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener, HttpServer port of Prometheus |
| sortChannel.type | N | org.apache.inlong.sort.standalone.channel.BufferQueueChannel | Channel type |
| sortSink.type | Y | NA | Sink class name. Different distribution types use different Sink classes. |
| sortSource.type | N | org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource | Source class name |
| sortClusterConfig.type | N | manager | There are three ways to load cluster configuration data: [file, Manager, custom class]. |
| sortClusterConfig.file | N | SortClusterConfig.conf | When the cluster configuration data is loaded from a file, the name of the configuration file in the classpath |
| sortClusterConfig.managerUrl | N | NA | When the cluster configuration data is loaded from the manager, define the URL of InlongManager here For example: http://${manager ip:port}/api/inlong/manager/openapi/sort/standalone/getClusterConfig |
| sortSourceConfig.QueryConsumeConfigType | N | manager | There are three ways to load the Sort task configuration data: [file, Manager, custom class]. If the loading path is file, the Sort task configuration file is in the class path, and the file name format is: ${sortTaskId}.conf. |
| sortSourceConfig.managerUrl | N | NA | When the Sort task configuration data loading source is manager, define the URL of InlongManager here For example::http://${manager ip:port}/api/inlong/manager/openapi/sort/standalone/getSortSource |
Configuration For SortClusterConfig.conf
- SortClusterConfig.conf source file in ClassPath, but does not support real-time updates
- Can obtain configuration from the HTTP interface of Inlong Manager, supporting real-time updates
| Parameter | Required | Types | DefaultValue | Remark |
|---|---|---|---|---|
| clusterName | Y | String | NA | inlong-sort-standalone cluster unique identifier |
| sortTasks | Y | JsonArray<SortTaskConfig> | NA | Sort task list |
Configuration For SortTaskConfig
| Parameter | Required | DefaultValue | Remark |
|---|---|---|---|
| name | Y | NA | Sort task name |
| type | Y | NA | Sort assignment type, like: HIVE("hive"), TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), ElasticSearch("ElasticSearch"), UNKNOWN("n") |
| idParams | Y | NA | Inlong data stream parameter list |
| sinkParams | Y | NA | Sort task parameters |
IdParams Configuration For Sort-Hive Task
| Parameter | Required | DefaultValue | Remark |
|---|---|---|---|
| inlongGroupId | Y | NA | inlongGroupId |
| inlongStreamId | Y | NA | inlongStreamId |
| separator | Y | NA | Delimiter |
| partitionIntervalMs | N | 3600000 | Partition interval, in milliseconds |
| idRootPath | Y | NA | HDFS root directory of Inlong data stream |
| partitionSubPath | Y | NA | Partition subdirectories for inlong data streams |
| hiveTableName | Y | NA | Hive table name of the Inlong data stream |
| partitionFieldName | N | dt | Partition field name of the Inlong data stream |
| partitionFieldPattern | Y | NA | The partition field value format of the Inlong data stream, such as {yyyyMMdd}, {yyyyMMddHH}, {yyyyMMddHHmm} |
| msgTimeFieldPattern | Y | NA | The field value format of the message generation time, Java time format |
| maxPartitionOpenDelayHour | N | 8 | Maximum opening delay time of the partition, in hours |
SinkParams Configuration For Sort-Hive Task
| Parameter | Required | DefaultValue | Remark |
|---|---|---|---|
| hdfsPath | Y | NA | HDFS nameNode |
| maxFileOpenDelayMinute | N | 5 | Maximum write time of a single HDFS file, in minutes |
| tokenOvertimeMinute | N | 60 | The maximum time it takes to create a token for a partition of a single Inlong data stream, in minutes |
| maxOutputFileSizeGb | N | 2 | Maximum size of a single HDFS file, in GB |
| hiveJdbcUrl | Y | NA | Hive JDBC Path |
| hiveDatabase | Y | NA | Hive Database |
| hiveUsername | Y | NA | Hive Username |
| hivePassword | Y | NA | Hive Password |
sid_hive_inlong6th_v3.conf Configuration For Sort-Hive Task
- File name format: Sort task name +
.conf. - Can read from the SortClusterConfig.conf source file in the ClassPath, but does not support live updates.
- Can be obtained from the HTTP interface of Inlong Manager, which supports real-time updates.
Configuration For sid_hive_inlong6th_v3.conf
| Parameter | Required | Type | DefaultValue | Remark |
|---|---|---|---|---|
| sortClusterName | Y | String | NA | inlong-sort-standalone Unique identifier of the cluster |
| sortTaskId | Y | String | NA | Sort task name |
| cacheZones | Y | JsonObject<String, JsonObject> | NA | Cache layer cluster list, format: Map<cacheClusterName, CacheCluster> |
Configuration For CacheCluster
| Parameter | Required | Type | DefaultValue | Remark |
|---|---|---|---|---|
| zoneName | Y | String | NA | Cache layer cluster name |
| zoneType | Y | String | NA | Cache type: [pulsar,tube,kafka] |
| serviceUrl | Y | String | NA | Pulsar's serviceUrl parameter, or Kafka's Broker list |
| authentication | Y | String | NA | Pulsar Authentication |
| cacheZoneProperties | N | Map<String,String> | NA | Cache layer Consumer parameters |
| topics | N | List<Topic> | NA | List of topics consumed by the cache layer |
Configuration For Topic
| Parameter | Required | Type | DefaultValue | Remark |
|---|---|---|---|---|
| topic | Y | String | NA | Topic full name, Pulsar: tenant/namespace/topic |
| partitionCnt | Y | Integer | NA | Number of Topic Partitions |
| topicProperties | N | Map<String,String> | NA | Consumer parameters of cache layer topics |
Start inlong-sort-standalone Application
Finally, execute the script ./bin/sort-start.sh to start the sort-standalone application.
You can then check the log file sort.log to confirm the startup status.