CREATE TABLE source_dwd_users( id STRING, name STRING, authenticated_status SMALLINT, birthdate STRING, checked_status SMALLINT, third_party_loginNo STRING, created_time TIMESTAMP(3), email STRING, head_photo STRING, one_inch_color_white_photo STRING, telephone STRING, last_appeal_status SMALLINT, id_card_num STRING, security_certificate_no STRING, education STRING, stature STRING,nation STRING,politics_status STRING,household_type STRING,marital_status STRING, military_status STRING, front_photo STRING, reverse_photo STRING, background_screening_status SMALLINT, cid STRING,last_sync_time TIMESTAMP(3),last_cer_sync_time TIMESTAMP(3), last_modifies_time TIMESTAMP(3), `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', watermark for ts as ts - INTERVAL '10' SECOND, PRIMARY KEY (id) NOT ENFORCED )with( 'connector' = 'upsert-kafka', 'topic' = 'dwd.olap_dwd.dwd_users', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'properties.partitions' = '5', 'properties.allow.auto.create.topics' = 'true', 'key.format' = 'csv', 'value.format' = 'json', 'value.json.fail-on-missing-field' = 'false', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TABLE sink_jdbc_dwd_users( id STRING, name STRING, authenticated_status SMALLINT, birthdate STRING, checked_status SMALLINT, third_party_loginNo STRING, created_time TIMESTAMP(3), email STRING, head_photo STRING, one_inch_color_white_photo STRING, telephone STRING, last_appeal_status SMALLINT, id_card_num STRING, security_certificate_no STRING, education STRING, stature STRING,nation STRING,politics_status STRING,household_type STRING,marital_status STRING, military_status STRING, front_photo STRING, reverse_photo STRING, background_screening_status SMALLINT, cid STRING,last_sync_time TIMESTAMP(3),last_cer_sync_time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED )with( 'connector'='jdbc', 'url'='jdbc:mysql://172.20.0.196:3306/olap_dwd', 'driver'='com.mysql.cj.jdbc.Driver', 'username'='bcx', 'password'='Wstestv5qy#2022', 'table-name'='dwd_users','sink.parallelism'='1' ); insert into sink_jdbc_dwd_users SELECT id,name,authenticated_status,birthdate,checked_status, third_party_loginNo,created_time,email,head_photo, one_inch_color_white_photo,telephone,last_appeal_status,id_card_num,security_certificate_no, education,stature,nation,politics_status,household_type,marital_status,military_status, front_photo,reverse_photo, background_screening_status,cid,last_sync_time,last_cer_sync_time FROM TABLE(HOP(TABLE source_dwd_users, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));