Monitor Metrics
Overview
We add metric computing for node. Sort will compute metric when user just need add with option inlong.metric.labels that includes groupId={groupId}&streamId={streamId}&nodeId={nodeId}.
Sort will export metric by flink metric group, So user can use metric reporter to get metric data.
Metric
Supporting extract node
Node level metric
| metric name | extract node | description | 
|---|---|---|
| groupId_streamId_nodeId_numRecordsIn | kafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc | input records number | 
| groupId_streamId_nodeId_numBytesIn | kafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc | input bytes number | 
| groupId_streamId_nodeId_numRecordsInPerSecond | kafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc | input records per second | 
| groupId_streamId_nodeId_numBytesInPerSecond | kafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc | input bytes number per second | 
Table level metric
It is used for all database sync.
| Metric name | Extract node | Description | 
|---|---|---|
| groupId_streamId_nodeId_database_table_numRecordsIn | mysql-cdc | input records number | 
| groupId_streamId_nodeId_database_schema_table_numRecordsIn | oracle-cdc,postgresql-cdc | input records number | 
| groupId_streamId_nodeId_database_collection_numRecordsIn | mongodb-cdc | input records number | 
| groupId_streamId_nodeId_database_table_numBytesIn | mysql-cdc | input records number | 
| groupId_streamId_nodeId_database_schema_table_numBytesIn | oracle-cdc,postgresql-cdc | input records number | 
| groupId_streamId_nodeId_database_collection_numBytesIn | mongodb-cdc | input bytes number | 
| groupId_streamId_nodeId_database_table_numRecordsInPerSecond | mysql-cdc | input records number per second | 
| groupId_streamId_nodeId_database_schema_table_numRecordsInPerSecond | oracle-cdc,postgresql-cdc | input records number per second | 
| groupId_streamId_nodeId_database_collection_numRecordsInPerSecond | mongodb-cdc | input records number per second | 
| groupId_streamId_nodeId_database_table_numBytesInPerSecond | mysql-cdc | input bytes number per second | 
| groupId_streamId_nodeId_database_schema_table_numBytesInPerSecond | oracle-cdc,postgresql-cdc | input bytes number per second | 
| groupId_streamId_nodeId_database_collection_numBytesInPerSecond | mongodb-cdc | input bytes number per second | 
supporting load node
Node level metric
| Metric name | Load node | Description | 
|---|---|---|
| groupId_streamId_nodeId_numRecordsOut | clickhouse,elasticsearch,greenplum,hbase, hdfs,hive,iceberg,kafka,mysql, oracle,postgresql,sqlserver,tdsql-postgresql  | out records number | 
| groupId_streamId_nodeId_numBytesOut | clickhouse,elasticsearch,greenplum,hbase, hdfs,hive,iceberg,kafka,mysql, oracle,postgresql,sqlserver,tdsql-postgresql  | output byte number | 
| groupId_streamId_nodeId_numRecordsOutPerSecond | clickhouse,elasticsearch,greenplum,hbase, hdfs,hive,iceberg,kafka,mysql, oracle,postgresql,sqlserver,tdsql-postgresql  | output records per second | 
| groupId_streamId_nodeId_numBytesOutPerSecond | clickhouse,elasticsearch,greenplum,hbase, hdfs,hive,iceberg,kafka,mysql, oracle,postgresql,sqlserver,tdsql-postgresql  | output bytes per second | 
| groupId_streamId_nodeId_dirtyRecordsOut | clickhouse,elasticsearch,greenplum,hbase, hdfs,hive,iceberg,kafka,mysql, oracle,postgresql,sqlserver,tdsql-postgresql  | output records | 
| groupId_streamId_nodeId_dirtyBytesOut | clickhouse,elasticsearch,greenplum,hbase, hdfs,hive,iceberg,kafka,mysql, oracle,postgresql,sqlserver,tdsql-postgresql  | output bytes | 
Table level metric
| Metric name | Load node | Description | 
|---|---|---|
| groupId_streamId_nodeId_database_table_numRecordsOut | doris,iceberg,starRocks | out records number | 
| groupId_streamId_nodeId_database_schema_table_numRecordsOut | postgresql | out records number | 
| groupId_streamId_nodeId_topic_numRecordsOut | kafka | out records number | 
| groupId_streamId_nodeId_database_table_numBytesOut | doris,iceberg,starRocks | out byte number | 
| groupId_streamId_nodeId_database_schema_table_numBytesOut | postgresql | out byte number | 
| groupId_streamId_nodeId_topic_numBytesOut | kafka | out byte number | 
| groupId_streamId_nodeId_database_table_numRecordsOutPerSecond | doris,iceberg,starRocks | out records number per second | 
| groupId_streamId_nodeId_database_schema_table_numRecordsOutPerSecond | postgresql | out records number per second | 
| groupId_streamId_nodeId_topic_numRecordsOutPerSecond | kafka | out records number per second | 
| groupId_streamId_nodeId_database_table_numBytesOutPerSecond | doris,iceberg,starRocks | out bytes number per second | 
| groupId_streamId_nodeId_database_schema_table_numBytesOutPerSecond | postgresql | out bytes number per second | 
| groupId_streamId_nodeId_topic_numBytesOutPerSecond | kafka | out bytes number per second | 
| groupId_streamId_nodeId_database_table_dirtyRecordsOut | doris,iceberg,starRocks | out records number | 
| groupId_streamId_nodeId_database_schema_table_dirtyRecordsOut | postgresql | out records number | 
| groupId_streamId_nodeId_topic_dirtyRecordsOut | kafka | out records number | 
| groupId_streamId_nodeId_database_table_dirtyBytesOut | doris,iceberg,starRocks | out byte number | 
| groupId_streamId_nodeId_database_schema_table_dirtyBytesOut | postgresql | out byte number | 
| groupId_streamId_nodeId_topic_dirtyBytesOut | kafka | out byte number | 
Usage
One example about sync mysql data to postgresql data. And We will introduce usage of metric.
- use flink sql
 
 create table `table_groupId_streamId_nodeId1`(
     `id` INT,
    `name` INT,
    `age` STRING,
    PRIMARY KEY(`id`) NOT ENFORCED)
    WITH (
        'connector' = 'mysql-cdc-inlong',
        'hostname' = 'xxxx',
        'username' = 'xxx',
        'password' = 'xxx',
        'database-name' = 'test',
        'scan.incremental.snapshot.enabled' = 'true',
        'server-time-zone' = 'GMT+8',
        'table-name' = 'user',
        'inlong.metric.labels' = 'groupId=xxgroup&streamId=xxstream&nodeId=xxnode'
);
 CREATE TABLE `table_groupId_streamId_nodeId2`(
     PRIMARY KEY (`id`) NOT ENFORCED,
     `id` INT,
     `name` STRING,
     `age` INT)
     WITH (
         'connector' = 'jdbc-inlong',
         'url' = 'jdbc:postgresql://ip:5432/postgres',
         'username' = 'postgres',
         'password' = 'inlong',
         'table-name' = 'public.user',
         'inlong.metric.labels' = 'groupId=pggroup&streamId=pgStream&nodeId=pgNode'
         );
 INSERT INTO `table_groupId_streamId_nodeId2`
 SELECT
     `id`,
     `name`,
     `age`
 FROM `table_groupId_streamId_nodeId1`;
- We can add metric report in flink-conf.yaml
 
metric.reporters: promgateway
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: ip
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.interval: 60 SECONDS
ip and port is your pushgateway setting. 
- We can visit http://ip:port of pushgateway after execute flink sql.
Metric name will add prefix 
flink_taskmanager_job_task_operatorwhen metric report isorg.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.
We can see full metric name:flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsIn,flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesIn,flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsInPerSecond,flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesInPerSecond,flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOut,flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOut,flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOutPerSecond,flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOutPerSecond.