SET 'pipeline.name' = 'kafka-user-credential-sink-user-partial-info'; SET 'table.exec.sink.not-null-enforcer' = 'DROP'; SET 'execution.runtime-mode' = 'streaming'; SET 'pipeline.auto-watermark-interval' = '200'; SET 'parallelism.default' = '1'; CREATE TABLE source_tenant_users ( id STRING, name STRING, authenticated_status TINYINT, telephone STRING, user_type TINYINT, selected_credential_id STRING, one_inch_color_white_photo STRING, two_inch_color_blue_photo STRING, birthdate STRING, certificate_image STRING, created_time BIGINT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_users', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'properties.group.id'='flink-dwd-consumer', 'format' = 'debezium-json' ); CREATE TABLE source_tenant_user_credentials ( id STRING, name STRING, number STRING, reverse_photo STRING, front_photo STRING, tenant_user_id STRING, address STRING, created_time BIGINT, valid_date_from STRING, valid_date_to STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_user_credentials', 'properties.group.id'='flink-dwd-consumer', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'format' = 'debezium-json' ); CREATE TABLE sink_kafka_user_info ( id STRING, name STRING, authenticated_status TINYINT, telephone STRING, user_type TINYINT, selected_credential_id STRING, one_inch_color_white_photo STRING, two_inch_color_blue_photo STRING, birthdate STRING, certificate_image STRING, number STRING, reverse_photo STRING, front_photo STRING, valid_date_from STRING, valid_date_to STRING, created_time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dwd-sink-user-dwd-info-upsert', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'properties.allow.auto.create.topics' = 'true', 'properties.num.partitions' = '10', 'key.format' = 'csv', 'value.format' = 'json', 'value.fields-include' = 'ALL', 'sink.parallelism' = '5', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TABLE sink_kafka_user_info_2 ( id STRING, name STRING, authenticated_status TINYINT, telephone STRING, user_type TINYINT, selected_credential_id STRING, one_inch_color_white_photo STRING, two_inch_color_blue_photo STRING, birthdate STRING, certificate_image STRING, number STRING, reverse_photo STRING, front_photo STRING, valid_date_from STRING, valid_date_to STRING, created_time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dwd-sink-user-dwd-info-upsert-2', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'key.format' = 'csv', 'value.format' = 'json', 'value.fields-include' = 'EXCEPT_KEY' ); EXECUTE STATEMENT SET BEGIN INSERT into sink_kafka_user_info ( id,name,authenticated_status, telephone,user_type, selected_credential_id, one_inch_color_white_photo, two_inch_color_blue_photo, birthdate,certificate_image, created_time ) SELECT u.id, u.name, u.authenticated_status, u.telephone, u.user_type, u.selected_credential_id, u.one_inch_color_white_photo, u.two_inch_color_blue_photo, u.birthdate, u.certificate_image, TO_TIMESTAMP(FROM_UNIXTIME(u.created_time/1000, 'yyyy-MM-dd HH:mm:ss')) FROM source_tenant_users AS u; INSERT into sink_kafka_user_info ( id, number,reverse_photo,front_photo,valid_date_from,valid_date_to ) SELECT u1.tenant_user_id, u1.number, u1.reverse_photo, u1.front_photo, u1.valid_date_from, u1.valid_date_to FROM source_tenant_user_credentials AS u1 where u1.tenant_user_id is not null; insert into sink_kafka_user_info_2 select * from sink_kafka_user_info; END;