cd /mnt/e/software/flink/flink-1.18.0 ./bin/start-cluster.sh 【启动客户端】 ./bin/sql-client.sh use obpm2; drop table tenant_users2; create table tenant_users2 (id varchar(50) primary key, name varchar(255) not null, last_sync_time TIMESTAMP not null )engine=InnoDB,charset=utf8mb4; create table tenant_users2_override (id varchar(50), name varchar(255) not null, last_sync_time TIMESTAMP not null )engine=InnoDB,charset=utf8mb4; #启用checkpints, 那么flink-conf.yaml 需要进行配置 execution.checkpointing.interval: 3min execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION execution.checkpointing.max-concurrent-checkpoints: 1 execution.checkpointing.min-pause: 0 execution.checkpointing.timeout: 10min execution.checkpointing.mode: EXACTLY_ONCE execution.checkpointing.tolerable-failed-checkpoints: 0 execution.checkpointing.unaligned: false state.backend: rocksdb state.checkpoints.dir: file:///mnt/e/software/flink/flink-1.18.0/state_data/checkpoints state.savepoints.dir: file:///mnt/e/software/flink/flink-1.18.0/state_data/savepoints CREATE TABLE source_tenant_users ( id STRING , name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '49.4.21.141', 'port' = '45611', 'username' = 'bcx', 'password' = 'Wstestv5qy#2022', 'database-name' = 'obpm2', 'table-name' = 'tenant_users', 'jdbc.properties.useSS'='false', 'server-id'='5400-5410' ); CREATE TABLE sink_tenant_users ( id STRING PRIMARY KEY NOT ENFORCED, name STRING, last_sync_time TIMESTAMP ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://49.4.21.141:45611/obpm2?characterEncoding=UTF-8&useUnicode=true&serverTimezone=GMT%2B8&useSSL=false&allowPublicKeyRetrieval=true&zeroDateTimeBehavior=CONVERT_TO_NULL', 'table-name' = 'tenant_users2', 'username' = 'bcx', 'password' = 'Wstestv5qy#2022' ); INSERT INTO sink_tenant_users(id,name,last_sync_time) SELECT id,name,CURRENT_TIMESTAMP FROM source_tenant_users /*+ OPTIONS('server-id'='5401-5404') */;