CREATE TABLE source_kafka_tenant_users( id STRING, name STRING, authenticated_status SMALLINT, birthdate BIGINT, checked_status SMALLINT, third_party_loginNo STRING, created_time BIGINT, email STRING, head_photo STRING, one_inch_color_white_photo STRING, telephone STRING, last_appeal_status SMALLINT, id_card_num STRING, security_certificate_no STRING, education STRING, stature STRING, nation STRING,politics_status STRING,household_type STRING,marital_status STRING, military_status STRING, front_photo STRING, reverse_photo STRING, background_screening_status SMALLINT, cid STRING,last_sync_time BIGINT,last_cer_sync_time BIGINT, PRIMARY KEY (id) NOT ENFORCED ) WITH ('connector'='kafka','scan.startup.mode'='group-offsets', 'properties.group.id'='flink-sql-consumer-jdbc-v3', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_users', 'properties.bootstrap.servers' = '117.78.39.204:9092','properties.auto.offset.reset' = 'earliest', 'properties.enable.auto.commit' = 'true','format'='debezium-json'); CREATE TABLE source_kafka_tenant_user_credentials( id STRING,created_time BIGINT,credential_type SMALLINT,front_photo STRING,reverse_photo STRING, head_photo STRING,name STRING,number STRING,tenant_user_id STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ('connector'='kafka','scan.startup.mode'='group-offsets', 'properties.group.id'='flink-sql-consumer-jdbc-v3', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_user_credentials', 'properties.bootstrap.servers' = '117.78.39.204:9092','properties.auto.offset.reset' = 'earliest', 'properties.enable.auto.commit' = 'true','format'='debezium-json'); CREATE TABLE source_kafka_tenant_user_credential_details( id STRING,create_time BIGINT,credentialtype STRING,securityCertificateNo STRING,tenant_user_id STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ('connector'='kafka','scan.startup.mode'='group-offsets', 'properties.group.id'='flink-sql-consumer-jdbc-v3', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_user_credential_details', 'properties.bootstrap.servers' = '117.78.39.204:9092','properties.auto.offset.reset' = 'earliest', 'properties.enable.auto.commit' = 'true','format'='debezium-json'); CREATE TABLE sink_dwd_users( id STRING, name STRING, authenticated_status SMALLINT, birthdate STRING, checked_status SMALLINT, third_party_loginNo STRING, created_time TIMESTAMP(3), email STRING, head_photo STRING, one_inch_color_white_photo STRING, telephone STRING, last_appeal_status SMALLINT, id_card_num STRING, security_certificate_no STRING, education STRING, stature STRING,nation STRING,politics_status STRING,household_type STRING,marital_status STRING, military_status STRING, front_photo STRING, reverse_photo STRING, background_screening_status SMALLINT, cid STRING,last_sync_time TIMESTAMP(3),last_cer_sync_time TIMESTAMP(3),last_modifies_time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED )with( 'connector' = 'upsert-kafka', 'topic' = 'dwd.olap_dwd.dwd_users', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'properties.partitions' = '5', 'properties.allow.auto.create.topics' = 'true', 'key.format' = 'csv', 'value.format' = 'json', 'value.json.fail-on-missing-field' = 'false', 'value.fields-include' = 'EXCEPT_KEY' ); SET execution.job.name = 'Insert into dwd_users from source_kafka_tenant_users'; INSERT INTO sink_dwd_users ( id,name,authenticated_status,birthdate,checked_status,third_party_loginNo,created_time,email, head_photo,one_inch_color_white_photo,telephone,last_appeal_status,education, stature,nation,politics_status,household_type,marital_status,military_status, background_screening_status, cid,last_sync_time,last_modifies_time ) SELECT u.id,u.name,u.authenticated_status,'1990-01-01',u.checked_status,u.third_party_loginNo,TO_TIMESTAMP(FROM_UNIXTIME(u.created_time/1000, 'yyyy-MM-dd HH:mm:ss')),u.email, u.head_photo,u.one_inch_color_white_photo,u.telephone,u.last_appeal_status,u.education,u.stature, u.nation,u.politics_status,u.household_type,u.marital_status,u.military_status, u.background_screening_status,u.cid,TO_TIMESTAMP(FROM_UNIXTIME(u.last_sync_time/1000, 'yyyy-MM-dd HH:mm:ss')),CURRENT_TIMESTAMP FROM source_kafka_tenant_users u; SET execution.job.name = 'Insert into dwd_users from source_kafka_tenant_user_credentials'; /** 证件信息 */ INSERT INTO sink_dwd_users ( id,name,id_card_num,front_photo, reverse_photo,last_cer_sync_time ) SELECT c.tenant_user_id,c.name,c.number,c.front_photo,c.reverse_photo,TO_TIMESTAMP(FROM_UNIXTIME(c.created_time/1000, 'yyyy-MM-dd HH:mm:ss')) FROM source_kafka_tenant_user_credentials c where c.credential_type in (0,1,2,3,4) and tenant_user_id is not null ; SET execution.job.name = 'Insert into dwd_users from source_kafka_tenant_user_credential_details'; /** 资格证信息 */ INSERT INTO sink_dwd_users ( id,security_certificate_no,last_cer_sync_time ) SELECT c.tenant_user_id,c.securityCertificateNo,TO_TIMESTAMP(FROM_UNIXTIME(c.create_time/1000, 'yyyy-MM-dd HH:mm:ss')) FROM source_kafka_tenant_user_credential_details c where c.credentialtype in ('1') and tenant_user_id is not null; CREATE TABLE sink_jdbc_dwd_users( id STRING, name STRING, authenticated_status SMALLINT, birthdate STRING, checked_status SMALLINT, third_party_loginNo STRING, created_time TIMESTAMP(3), email STRING, head_photo STRING, one_inch_color_white_photo STRING, telephone STRING, last_appeal_status SMALLINT, id_card_num STRING, security_certificate_no STRING, education STRING, stature STRING,nation STRING,politics_status STRING,household_type STRING,marital_status STRING, military_status STRING, front_photo STRING, reverse_photo STRING, background_screening_status SMALLINT, cid STRING,last_sync_time TIMESTAMP(3),last_cer_sync_time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED )with( 'connector'='jdbc', 'url'='jdbc:mysql://172.20.0.196:3306/olap_dwd', 'driver'='com.mysql.cj.jdbc.Driver', 'username'='bcx', 'password'='Wstestv5qy#2022', 'table-name'='dwd_users','sink.parallelism'='1' ); insert into sink_jdbc_dwd_users SELECT id,name,authenticated_status,birthdate,checked_status, third_party_loginNo,created_time,email,head_photo, one_inch_color_white_photo,telephone,last_appeal_status,id_card_num,security_certificate_no, education,stature,nation,politics_status,household_type,marital_status,military_status, front_photo,reverse_photo, background_screening_status,cid,last_sync_time,last_cer_sync_time FROM TABLE(HOP(TABLE sink_dwd_users, DESCRIPTOR(last_modifies_time), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));