SET 'execution.checkpointing.interval' = '30s'; CREATE TABLE source_kafka_tenant_users( id STRING, name STRING, authenticated_status INT, birthdate BIGINT, checked_status INT, third_party_loginNo STRING, created_time BIGINT, email STRING, head_photo STRING, one_inch_color_white_photo STRING, telephone STRING, last_appeal_status INT, id_card_num STRING, security_certificate_no STRING, education STRING, stature STRING, nation STRING,politics_status STRING,household_type STRING,marital_status STRING, military_status STRING, front_photo STRING, reverse_photo STRING, background_screening_status INT, cid STRING,last_sync_time BIGINT,last_cer_sync_time BIGINT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector'='mysql-cdc', 'hostname'='49.4.21.141', 'port' = '45611', 'username'='bcx', 'password'='Wstestv5qy#2022', 'database-name' = 'obpm2', 'table-name'='tenant_users', 'server-id'='5400-5403', 'scan.startup.mode'='earliest-offset'); CREATE TABLE source_kafka_tenant_user_credentials( id STRING,created_time BIGINT,credential_type INT,front_photo STRING,reverse_photo STRING, head_photo STRING,name STRING,number STRING,tenant_user_id STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector'='mysql-cdc', 'hostname'='49.4.21.141', 'port' = '45611', 'username'='bcx', 'password'='Wstestv5qy#2022', 'database-name' = 'obpm2', 'table-name'='tenant_user_credentials', 'server-id'='5404-5407', 'scan.startup.mode'='earliest-offset' ); CREATE TABLE IF NOT EXISTS sink_dwd_users ( id STRING, name STRING, authenticated_status INT, birthdate STRING, checked_status INT, third_party_loginNo STRING, created_time TIMESTAMP(3), email STRING, head_photo STRING, one_inch_color_white_photo STRING, telephone STRING, last_appeal_status INT, education STRING, stature STRING,nation STRING,politics_status STRING,household_type STRING,marital_status STRING, military_status STRING, background_screening_status INT, cid STRING,last_sync_time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) with ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', 'load-url' = '127.0.0.1:8030', 'database-name' = 'olap_dwd', 'table-name' = 'dwd_users', 'username' = 'root', 'password' = '', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.column_separator' = '\x01', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.partial_update'='true' ); INSERT INTO sink_dwd_users ( id,name,authenticated_status,birthdate,checked_status,third_party_loginNo,created_time,email, head_photo,one_inch_color_white_photo,telephone,last_appeal_status,education, stature,nation,politics_status,household_type,marital_status,military_status, background_screening_status, cid,last_sync_time ) SELECT u.id,u.name,u.authenticated_status,'1990-01-01',u.checked_status,u.third_party_loginNo,TO_TIMESTAMP(FROM_UNIXTIME(u.created_time/1000, 'yyyy-MM-dd HH:mm:ss')),u.email, u.head_photo,u.one_inch_color_white_photo,u.telephone,u.last_appeal_status,u.education,u.stature, u.nation,u.politics_status,u.household_type,u.marital_status,u.military_status, u.background_screening_status,u.cid,TO_TIMESTAMP(FROM_UNIXTIME(u.last_sync_time/1000, 'yyyy-MM-dd HH:mm:ss')) FROM source_kafka_tenant_users u; CREATE TABLE source_kafka_tenant_user_credential_details( id STRING,xm STRING,create_time BIGINT,credentialtype STRING,securityCertificateNo STRING,tenant_user_id STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector'='mysql-cdc', 'hostname'='49.4.21.141', 'port' = '45611', 'username'='bcx', 'password'='Wstestv5qy#2022', 'database-name' = 'obpm2', 'table-name'='tenant_user_credential_details', 'server-id'='5407-5409', 'scan.startup.mode'='earliest-offset'); SET execution.job.name = 'Insert into dwd_users from source_kafka_tenant_user_credentials'; CREATE TABLE IF NOT EXISTS sink_dwd_user_credentials ( id STRING, name STRING, id_card_num STRING,front_photo STRING, reverse_photo STRING,last_cer_sync_time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) with ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', 'load-url' = '127.0.0.1:8030', 'database-name' = 'olap_dwd', 'table-name' = 'dwd_users', 'username' = 'root', 'password' = '', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.column_separator' = '\x01', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.partial_update'='true' ); /** 证件信息 */ INSERT INTO sink_dwd_user_credentials ( id,name,id_card_num,front_photo, reverse_photo,last_cer_sync_time ) SELECT c.tenant_user_id,c.name,c.number,c.front_photo,c.reverse_photo,TO_TIMESTAMP(FROM_UNIXTIME(c.created_time/1000, 'yyyy-MM-dd HH:mm:ss')) FROM source_kafka_tenant_user_credentials c where c.credential_type in (0,1,2,3,4) and c.tenant_user_id is not null ; SET execution.job.name = 'Insert into dwd_users from source_kafka_tenant_user_credential_details'; CREATE TABLE IF NOT EXISTS sink_dwd_user_credential_details ( id STRING,name STRING,security_certificate_no STRING,last_cer_sync_time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) with ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', 'load-url' = '127.0.0.1:8030', 'database-name' = 'olap_dwd', 'table-name' = 'dwd_users', 'username' = 'root', 'password' = '', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.column_separator' = '\x01', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.partial_update'='true' ); /** 资格证信息 */ INSERT INTO sink_dwd_user_credential_details ( id,name,security_certificate_no,last_cer_sync_time ) SELECT c.tenant_user_id,c.xm,c.securityCertificateNo,TO_TIMESTAMP(FROM_UNIXTIME(c.create_time/1000, 'yyyy-MM-dd HH:mm:ss')) FROM source_kafka_tenant_user_credential_details c where c.credentialtype in ('1') and c.tenant_user_id is not null;