Kafka
Overview
The Kafka Load Node supports to write data into Kafka topics. It can support to write data in the normal fashion and write data in the
upsert fashion. The upsert-kafka connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER data as
normal Kafka messages value, and write DELETE data as Kafka messages with null values (indicate tombstone for the key).
Supported Version
| Load Node | Kafka version | 
|---|---|
| Kafka | 0.10+ | 
Dependencies
In order to set up the Kafka Load Node, the following provides dependency information for both projects using a
build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
Maven dependency
<dependency>
    <groupId>org.apache.inlong</groupId>
    <artifactId>sort-connector-kafka</artifactId>
    <version>${siteVariables.inLongVersion}</version>
</dependency>
How to create a Kafka Load Node
Usage for SQL API
The example below shows how to create a Kafka Load Node with Flink SQL :
- connector is kafka-inlong
-- Create a Kafka table 'kafka_load_node' in Flink SQL
Flink SQL> CREATE TABLE kafka_load_node (
        `id` INT,
        `name` STRINTG
    ) WITH (
        'connector' = 'kafka-inlong',
        'topic' = 'user',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id' = 'testGroup',
        'format' = 'csv'
    )
- connector is upsert-kafka
-- Create a Kafka table 'kafka_load_node' in Flink SQL
Flink SQL> CREATE TABLE kafka_load_node (
        `id` INT,
        `name` STRINTG,
           PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
        'connector' = 'upsert-kafka-inlong',
        'topic' = 'user',
        'properties.bootstrap.servers' = 'localhost:9092',
        'key.format' = 'csv',
        'value.format' = 'csv'
    )   
Usage for InLong Dashboard
When creating a data flow, select Kafka for the data stream direction, and click "Add" to configure it.

Usage for InLong Manager Client
TODO: It will be supported in the future.
Kafka Load Node Options
| Option | Required | Default | Type | Description | 
|---|---|---|---|---|
| connector | required | (none) | String | Specify which connector to use, valid values are:  1. for the Upsert Kafka use: upsert-kafka-inlong2. for normal Kafka use:kafka-inlong | 
| topic | required | (none) | String | Topic name(s) to read data from when the table is used as source. It also supports  topic list for source by separating topic by semicolon like topic-1;topic-2. Note, only one oftopic-patternandtopiccan be specified for sources. | 
| topic-pattern | optional | (none) | String | Dynamic topic extraction pattern, like ${VARIABLE_NAME}, which is only used in kafka multiple sink scenarios and is valid whenformatisraw. | 
| sink.multiple.format | optional | (none) | String | Format of kafka raw data, currently only supports [canal-json|debezium-json] which is only used in kafka multiple sink scenarios and is valid when formatisraw. | 
| properties.bootstrap.servers | required | (none) | String | Comma separated list of Kafka brokers. | 
| properties.* | optional | (none) | String | This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the properties.key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation viaproperties.allow.auto.create.topics=false. But there are some configurations that do not support to set, because Flink will override them, e.g.key.deserializerandvalue.deserializer. | 
| format | required for normal Kafka | (none) | String | The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. Note: Either this option or the value.formatoption are required. | 
| key.format | optional | (none) | String | The format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Note: If a key format is defined, the key.fieldsoption is required as well. Otherwise the Kafka records will have an empty key. | 
| key.fields | optional | [] | List<String> | Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like field1;field2. | 
| key.fields-prefix | optional | (none) | String | Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and key.fieldswill work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires thatvalue.fields-includemust be set toEXCEPT_KEY. | 
| value.format | required for upsert Kafka | (none) | String | The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. | 
| value.fields-include | optional | ALL | Enum Possible values: [ALL, EXCEPT_KEY] | Defines a strategy how to deal with key columns in the data type of the value format. By default, ALLphysical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format | 
| sink.partitioner | optional | default | String | Output partitioning from Flink partitions into Kafka partitions. Valid values are default: use the kafka default partitioner to partition records.fixed: each Flink partition ends up in at most one Kafka partition.round-robin: a Flink partition is distributed to Kafka partitions sticky round-robin.raw-hash: Extract value based on sink.multiple.partition-patterntohashas the final partition, which is only used in kafka multiple sink scenarios and is valid whenformatisraw. It only works when record's keys are not specified. Custom FlinkKafkaPartitioner subclass: e.g.org.mycompany.MyPartitioner. See the following Sink Partitioning for more details. | 
| sink.multiple.partition-pattern | optional | (none) | String | Dynamic partition extraction pattern, like ${VARIABLE_NAME}which is only used in kafka multiple sink scenarios and is valid whenformatisraw. | 
| sink.semantic | optional | at-least-once | String | Defines the delivery semantic for the Kafka sink. Valid enumerationns are at-least-once,exactly-onceandnone. See Consistency guarantees for more details. | 
| sink.parallelism | optional | (none) | Integer | Defines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. | 
| inlong.metric.labels | optional | (none) | String | Inlong metric label, format of value is groupId= {groupId}&streamId={streamId}&nodeId={nodeId}. | 
Available Metadata
It supports write metadata for format canal-json-inlong.
See the Kafka Extract Node for a list of all available metadata fields.
Features
Support Dynamic Schema Writing
Dynamic schema writing supports dynamic extraction of topic and partition from data and writes to the corresponding topic
and partition. In order to support dynamic schema writing, you need to set the format of Kafka to raw,
Also need to set the serialization format of the upstream data (via the option sink.multiple.format
to set, currently only supports [canal-json|debezium-json]).
Dynamic Topic Extraction
Dynamic topic extraction is by parsing the topic pattern and extracting the topic from the data.
In order to support dynamic extraction of topic, you need to set the option topic-pattern, Kafka Load Node will parse topic-pattern as the final topic,
If parsing fails, it will be written to the default topic set via topic. topic-pattern supports constants and variables, constants are string constants,
variables are strictly represented by ${VARIABLE_NAME}, and the value of the variable comes from the data itself, that is, through sink.multiple.format
a metadata field of a specified Format, or a physical field in the data.
Examples of topic-parttern are as follows:
- sink.multiple.formatis- canal-json:
The upstream data is:
{
  "data": [
    {
      "id": "111",
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": "5.18"
    }
  ],
  "database": "inventory",
  "es": 1589373560000,
  "id": 9,
  "isDdl": false,
  "mysqlType": {
    "id": "INTEGER",
    "name": "VARCHAR(255)",
    "description": "VARCHAR(512)",
    "weight": "FLOAT"
  },
  "old": [
    {
      "weight": "5.15"
    }
  ],
  "pkNames": [
    "id"
  ],
  "sql": "",
  "sqlType": {
    "id": 4,
    "name": 12,
    "description": 12,
    "weight": 7
  },
  "table": "products",
  "ts": 1589373560798,
  "type": "UPDATE"
} 
topic-pattern is {database}_${table}, and the extracted topic is inventory_products (database, table are metadata fields)
topic-pattern is {database}_${table}_${id}, and the extracted topic is inventory_products_111 (database, table are metadata fields, and id are physical fields)
- sink.multiple.formatis- debezium-json:
The upstream data is:
{
  "before": {
    "id": 4,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  },
  "after": {
    "id": 4,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "source": {
    "db": "inventory",
    "table": "products"
  },
  "op": "u",
  "ts_ms": 1589362330904,
  "transaction": null
}
topic-pattern is {source.db}_${source.table}, and the extracted topic is inventory_products (source.db, source.table are metadata fields)
topic-pattern is {source.db}_${source.table}_${id}, and the extracted topic is inventory_products_4 (source.db, source.table are metadata fields, and id are physical fields)
Dynamic Partition Extraction
Dynamic partition extraction is to extract Partition from data by parsing partition pattern, which is similar to dynamic topic extraction.
To support dynamic extraction of topics, you need to set the option sink.partitioner to raw-hash
and option sink.multiple.partition-pattern, Kafka Load Node will parse sink.multiple.partition-pattern
as the partition key, hash the partition key and take the remainder of the partition size as the final partition,
If parsing fails, it will return null and execute Kafka default partitioning strategy. sink.multiple.partition-pattern
support constants, variables and primary keys. Constants are string constants. Variables are strictly represented by ${VARIABLE_NAME}, the value of the variable comes from the data itself,
that is, it can be a metadata field of a format specified by sink.multiple.format, or it can be a physical field in the data.
The primary key is a special constant PRIMARY_KEY, which extracts the primary key value of the record based on a certain format data format.
Notes: Kafka dynamic partition extraction based on PRIMARY_KEY has a limitation that the primary key information needs to be specified in the data,
For example, if Format is canal-json, then its primary key is pkNames. In addition, because format debezium-json has no definition of primary key, here
we agree that the primary key of debezium-json is also pkNames and is included in source like other metadata fields such as table and db,
If partitioning by primary key is used, and the format is debezium-json, you need to ensure that the real data meets the above conventions.
Data Type Mapping
Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or data types. The Kafka messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.