SET 'pipeline.name' = 'kafka-user-credential-sink-dwd-user'; SET 'table.exec.sink.not-null-enforcer' = 'DROP'; SET 'execution.runtime-mode' = 'streaming'; SET 'pipeline.auto-watermark-interval' = '200'; SET 'parallelism.default' = '1'; CREATE TABLE source_tenant_users ( id STRING, name STRING, authenticated_status TINYINT, telephone STRING, user_type TINYINT, selected_credential_id STRING, one_inch_color_white_photo STRING, two_inch_color_blue_photo STRING, birthdate STRING, certificate_image STRING, created_time BIGINT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_users', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'properties.group.id'='flink-dwd-consumer', 'format' = 'debezium-json' ); CREATE TABLE source_tenant_user_credentials ( id STRING, name STRING, number STRING, reverse_photo STRING, front_photo STRING, tenant_user_id STRING, address STRING, created_time BIGINT, valid_date_from STRING, valid_date_to STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_user_credentials', 'properties.group.id'='flink-dwd-consumer', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'format' = 'debezium-json' ); CREATE TABLE sink_dwd_user_info ( id STRING, name STRING, created_time TIMESTAMP(3), last_synced_time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://49.4.21.141:45611/dwd_warehouse?characterEncoding=UTF-8&connectionTimeZone=GMT&allowMultiQueries=true&autoReconnect=true', 'table-name' = 'dwd_user_info', 'username' = 'bcx', 'password' = 'Wstestv5qy#2022' ); CREATE TABLE sink_dwd_user_info_2 ( id STRING, number STRING, last_synced_time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://49.4.21.141:45611/dwd_warehouse?characterEncoding=UTF-8&connectionTimeZone=GMT&allowMultiQueries=true&autoReconnect=true', 'table-name' = 'dwd_user_info', 'username' = 'bcx', 'password' = 'Wstestv5qy#2022' ); EXECUTE STATEMENT SET BEGIN INSERT into sink_dwd_user_info ( id,name,created_time) SELECT u.id, u.name, TO_TIMESTAMP(FROM_UNIXTIME(u.created_time/1000, 'yyyy-MM-dd HH:mm:ss')) FROM source_tenant_users AS u; INSERT into sink_dwd_user_info_2 ( id,number) SELECT u.tenant_user_id, u.number FROM source_tenant_user_credentials AS u where u.tenant_user_id is not null; END;