flink sql cdc
这里以同步mysql数据到clickhouse为例
不支持同步mysql表结构变更到clickhouse
一、定义source
CREATE TABLE source_blacklist (
`id` BIGINT,
`type_id` BIGINT,
`value` STRING,
`comment` STRING,
`create_time` Timestamp(3),
`update_time` TIMESTAMP(3),
`deleted` TINYINT,
PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'x.x.x.x',
'port' = '30005',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'xxx',
'table-name' = 'blacklist',
'jdbc.properties.useSSL' = 'false'
);
二、定义sink
这里使用了clickhouse-connector
CREATE TABLE sink_blacklist (
`id` BIGINT,
`type_id` BIGINT,
`value` STRING,
`comment` STRING,
`create_time` Timestamp(3),
`update_time` TIMESTAMP(3),
`deleted` TINYINT,
PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = 'x.x.x.x',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'xxx',
'table-name' = 'blacklist',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3'
);
三、insert into
SET 'pipeline.name' = 'xxx';
SET 'table.local-time-zone' = 'Asia/Shanghai';
SET 'state.checkpoints.dir' = 'file:///flink-data/cp/xxx';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.interval' = '1min';
SET 'execution.checkpointing.prefer-checkpoint-for-recovery' = 'true';
--
EXECUTE STATEMENT SET
BEGIN
INSERT INTO sink_blacklist SELECT * from source_blacklist;
END;
Loading...