SQLServer-CDC
Overview
The SQLServer Extract Node reads data and incremental data from the SQLServer database. The following will describe how to set up the SQLServer extraction node.
Supported Version
| Extract Node | Version |
|---|---|
| SQLServer-cdc | SQLServer: 2014、2016、2017、2019、2022 |
Dependencies
Introduce related SQLServer Extract Node dependencies through maven. Of course, you can also use INLONG to provide jar packages.(sort-connector-sqlserver-cdc)
Maven dependency
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-sqlserver-cdc</artifactId>
<version>2.2.0-SNAPSHOT</version>
</dependency>
Setup SQLServer Extract Node
SQLServer Extract Node needs to open related libraries and tables, the steps are as follows:
- Enable the CDC function for the database.
if exists(select 1 from sys.databases where name='dbName' and is_cdc_enabled=0)
begin
exec sys.sp_cdc_enable_db
end
- Check the database CDC capability status.
select is_cdc_enabled from sys.databases where name='dbName'
note: 1 is running CDC of DB.
- Turn on CDC for the table
IF EXISTS(SELECT 1 FROM sys.tables WHERE name='tableName' AND is_tracked_by_cdc = 0)
BEGIN
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo', -- source_schema
@source_name = 'tableName', -- table_name
@capture_instance = NULL, -- capture_instance
@supports_net_changes = 1, -- supports_net_changes
@role_name = NULL, -- role_name
@index_name = NULL, -- index_name
@captured_column_list = NULL, -- captured_column_list
@filegroup_name = 'PRIMARY' -- filegroup_name
END
note: The table must have a primary key or unique index.
- Check the table CDC capability status.
SELECT is_tracked_by_cdc FROM sys.tables WHERE name='tableName'
note: 1 is running CDC of table.
How to create a SQLServer Extract Node
Usage for SQL API
The example below shows how to create a SQLServer Extract Node with Flink SQL Cli :
-- Set checkpoint every 3000 milliseconds
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
-- Create a SQLServer table 'sqlserver_extract_node' in Flink SQL Cli
Flink SQL> CREATE TABLE sqlserver_extract_node (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc-inlong',
'hostname' = 'YourHostname',
'port' = 'port', --default:1433
'username' = 'YourUsername',
'password' = 'YourPassword',
'database-name' = 'YourDatabaseName',
'schema-name' = 'YourSchemaName' -- default:dbo
'table-name' = 'YourTableName');
-- Read snapshot and binlog from sqlserver_extract_node
Flink SQL> SELECT * FROM sqlserver_extract_node;
Usage for InLong Dashboard
TODO
Usage for InLong Manager Client
TODO
SQLServer Extract Node Options
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
| connector | required | (none) | String | Specify what connector to use, here should be 'sqlserver-cdc-inlong'. |
| hostname | required | (none) | String | IP address or hostname of the SQLServer database. |
| username | required | (none) | String | Username to use when connecting to the SQLServer database. |
| password | required | (none) | String | Password to use when connecting to the SQLServer database. |
| database-name | required | (none) | String | Database name of the SQLServer database to monitor. |
| schema-name | required | dbo | String | Schema name of the SQLServer database to monitor. |
| table-name | required | (none) | String | Table name of the SQLServer database to monitor. |
| port | optional | 1433 | Integer | Integer port number of the SQLServer database. |
| server-time-zone | optional | UTC | String | The session time zone in database server, e.g. "Asia/Shanghai". |
| inlong.metric.labels | optional | (none) | String | Inlong metric label, format of value is groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId]. |
Available Metadata
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
| Key | DataType | Description |
|---|---|---|
| table_name | STRING NOT NULL | Name of the table that contain the row. |
| schema_name | STRING NOT NULL | Name of the schema that contain the row. |
| database_name | STRING NOT NULL | Name of the database that contain the row. |
| op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the binlog, the value is always 0. |
The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
CREATE TABLE sqlserver_extract_node (
table_name STRING METADATA FROM 'table_name' VIRTUAL,
schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
id INT NOT NULL
) WITH (
'connector' = 'sqlserver-cdc-inlong',
'hostname' = 'localhost',
'port' = '1433',
'username' = 'sa',
'password' = 'password',
'database-name' = 'test',
'schema-name' = 'dbo',
'table-name' = 'worker'
);
Data Type Mapping
| SQLServer type | Flink SQL type |
|---|---|
| char(n) | CHAR(n) |
| varchar(n) nvarchar(n) nchar(n) | VARCHAR(n) |
| text ntext xml | STRING |
| decimal(p, s) money smallmoney | DECIMAL(p, s) |
| numeric | NUMERIC |
| REAL FLOAT | FLOAT |
| bit | BOOLEAN |
| int | INT |
| tinyint | TINYINT |
| smallint | SMALLINT |
| time (n) | TIME (n) |
| bigint | BIGINT |
| date | DATE |
| datetime2 datetime smalldatetime | TIMESTAMP(n) |
| datetimeoffset | TIMESTAMP_LTZ(3) |