SET 'table.exec.sink.not-null-enforcer' = 'DROP'; SET 'execution.runtime-mode' = 'streaming'; SET 'pipeline.auto-watermark-interval' = '200'; SET 'parallelism.default' = '8'; SET 'table.local-time-zone' = 'Asia/Shanghai'; CREATE TABLE sink_collect_employee ( employee_id STRING, user_id STRING, name STRING, id_type TINYINT, id_no STRING, phone STRING, gender TINYINT, ethnic_group STRING, education TINYINT, birth_date date, political_status TINYINT, military TINYINT, occupation_type TINYINT, hire_date TIMESTAMP(3), leave_date TIMESTAMP(3), employed TINYINT, insure_status TINYINT, check_status TINYINT, auth_status TINYINT, last_auth_time TIMESTAMP(3), auth_result STRING, background_status TINYINT, certificate_no STRING, with_certificate TINYINT, security_grade STRING, security_grade_int SMALLINT, record_date TIMESTAMP(3), company_name STRING, office_type STRING, company_id STRING, department STRING, department_id STRING, supervise_office STRING, supervise_office_code STRING, att_site_name STRING, att_site_id STRING, att_site_supervise_office STRING, att_site_supervise_office_code STRING, LASTMODIFIED TIMESTAMP(3), in_att_site TINYINT, profile_photo STRING, DOMAINID STRING, item_domain_id STRING, last_sync_time TIMESTAMP(3), item_region_id STRING, residence TINYINT, height INT, birth_place STRING, marriage TINYINT, emergency_contact STRING, emergency_phone STRING, driver_license STRING, work_years INT, medical_history STRING, address STRING, id_issue_date TIMESTAMP(3), id_expire_date TIMESTAMP(3), id_address STRING, id_back_img STRING, id_front_img STRING, id_person_photo STRING, PRIMARY KEY (employee_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://49.4.21.141:45611/dwd_warehouse?characterEncoding=UTF-8&connectionTimeZone=GMT&allowMultiQueries=true&autoReconnect=true', 'table-name' = 'dwd_user_info', 'username' = 'bcx', 'password' = 'Wstestv5qy#2022' ); CREATE TABLE sink_collect_employee_basic ( employee_id STRING, user_id STRING, name STRING, education INT, birth_date STRING, political_status INT, military INT, occupation_type TINYINT, last_sync_time TIMESTAMP(3), PRIMARY KEY (employee_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://49.4.21.141:45611/dwd_warehouse?characterEncoding=UTF-8&connectionTimeZone=GMT&allowMultiQueries=true&autoReconnect=true', 'table-name' = 'employees', 'username' = 'bcx', 'password' = 'Wstestv5qy#2022' ); CREATE TABLE sink_collect_employee_id_card ( employee_id STRING, user_id STRING, id_type TINYINT, id_no STRING, last_sync_time TIMESTAMP(3), PRIMARY KEY (employee_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://49.4.21.141:45611/dwd_warehouse?characterEncoding=UTF-8&connectionTimeZone=GMT&allowMultiQueries=true&autoReconnect=true', 'table-name' = 'employees', 'username' = 'bcx', 'password' = 'Wstestv5qy#2022' ); CREATE TABLE sink_collect_employee_attendance ( employee_id STRING, att_site_name STRING, att_site_id STRING, att_site_supervise_office STRING, att_site_supervise_office_code STRING, last_sync_time TIMESTAMP(3), PRIMARY KEY (employee_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://49.4.21.141:45611/dwd_warehouse?characterEncoding=UTF-8&connectionTimeZone=GMT&allowMultiQueries=true&autoReconnect=true', 'table-name' = 'employees', 'username' = 'bcx', 'password' = 'Wstestv5qy#2022' ); CREATE TABLE source_tenant_users ( id STRING, name STRING, authenticated_status TINYINT, telephone STRING, user_type TINYINT, selected_credential_id STRING, one_inch_color_white_photo STRING, two_inch_color_blue_photo STRING, birthdate STRING, certificate_image STRING, created_time BIGINT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_users', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'properties.group.id'='consume-collect-employee-group', 'format' = 'debezium-json', 'sink.delivery-guarantee' = 'exactly-once' ); CREATE TABLE source_tenant_employees ( id STRING, tenant_user_id STRING, occupation_type TINYINT ) WITH ( 'connector' = 'kafka', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_employees', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'scan.startup.mode' = 'group-offsets', 'properties.group.id'='consume-collect-employee-group', 'format' = 'debezium-json', 'properties.auto.offset.reset' = 'earliest' ); CREATE TABLE dim_source_tenant_employees ( id STRING, tenant_user_id STRING, occupation_type TINYINT ) WITH ( 'connector' = 'kafka', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_employees', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'scan.startup.mode' = 'earliest-offset', 'properties.group.id'='dim-consume-collect-employee-group', 'format' = 'debezium-json', 'properties.auto.offset.reset' = 'earliest' ); CREATE TABLE source_tenant_user_credentials ( id STRING, tenant_user_id STRING, number STRING, credential_type TINYINT ) WITH ( 'connector' = 'kafka', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_user_credentials', 'properties.group.id'='consume-collect-employee-group', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'scan.startup.mode' = 'group-offsets', 'format' = 'debezium-json', 'properties.auto.offset.reset' = 'earliest' ); CREATE TABLE dim_source_tlk_attendance_site_base_info ( ID STRING, ITEM_attendanceSiteName STRING, ITEM_officePoliceAddress STRING, ITEM_officePoliceAddressID STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'baibaodunflow.binlog-cdc.topic.v2.tlk_attendance_site_base_info', 'properties.group.id'='dim-consume-collect-employee-group', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'properties.auto.offset.reset' = 'earliest' ); CREATE TABLE source_tlk_attendance_site_base_info ( ID STRING, ITEM_attendanceSiteName STRING, ITEM_officePoliceAddress STRING, ITEM_officePoliceAddressID STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'baibaodunflow.binlog-cdc.topic.v2.tlk_attendance_site_base_info', 'properties.group.id'='consume-collect-employee-group', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'scan.startup.mode' = 'group-offsets', 'format' = 'debezium-json', 'properties.auto.offset.reset' = 'earliest' ); CREATE TABLE source_tlk_attendance_site_person_info ( ID STRING, ITEM_securityId STRING, ITEM_attendanceSiteId STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'baibaodunflow.binlog-cdc.topic.v2.tlk_attendance_site_person_info', 'properties.group.id'='consume-collect-employee-group', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'scan.startup.mode' = 'group-offsets', 'format' = 'debezium-json', 'properties.auto.offset.reset' = 'earliest' ); CREATE TABLE dim_source_tlk_attendance_site_person_info ( ID STRING, ITEM_securityId STRING, ITEM_attendanceSiteId STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'baibaodunflow.binlog-cdc.topic.v2.tlk_attendance_site_person_info', 'properties.group.id'='dim-consume-collect-employee-group', 'properties.bootstrap.servers' = '117.78.39.204:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'properties.auto.offset.reset' = 'earliest' ); SET 'pipeline.name' = 'dim-dwd-kafka-to-collect-employee'; EXECUTE STATEMENT SET BEGIN insert into sink_collect_employee_basic (employee_id,user_id,name,education,birth_date,political_status,military,occupation_type,last_sync_time) select ee.id,u.id,u.name,1,'1990-01-01',1,1,ee.occupation_type,CURRENT_TIMESTAMP from source_tenant_employees ee join source_tenant_users u on ee.tenant_user_id=u.id; insert into sink_collect_employee_basic (employee_id,user_id,name,education,birth_date,political_status,military,occupation_type,last_sync_time) select ee.id,u.id,u.name,1,'1990-01-01',1,1,ee.occupation_type,CURRENT_TIMESTAMP from dim_source_tenant_employees ee join source_tenant_users u on ee.tenant_user_id=u.id; insert into sink_collect_employee_id_card (employee_id,user_id,id_type,id_no,last_sync_time) select ee.id,u.tenant_user_id,u.credential_type,u.number,CURRENT_TIMESTAMP from source_tenant_employees ee join source_tenant_user_credentials u on ee.tenant_user_id=u.tenant_user_id; insert into sink_collect_employee_attendance (employee_id,att_site_name,att_site_id,att_site_supervise_office,att_site_supervise_office_code,last_sync_time) select ee.ITEM_securityId,u.ITEM_attendanceSiteName,u.ID,u.ITEM_officePoliceAddress,u.ITEM_officePoliceAddressID,CURRENT_TIMESTAMP from source_tlk_attendance_site_person_info ee join dim_source_tlk_attendance_site_base_info u on ee.ITEM_attendanceSiteId=u.ID; insert into sink_collect_employee_attendance (employee_id,att_site_name,att_site_id,att_site_supervise_office,att_site_supervise_office_code,last_sync_time) select ee.ITEM_securityId,u.ITEM_attendanceSiteName,u.ID,u.ITEM_officePoliceAddress,u.ITEM_officePoliceAddressID,CURRENT_TIMESTAMP from dim_source_tlk_attendance_site_person_info ee join source_tlk_attendance_site_base_info u on ee.ITEM_attendanceSiteId=u.ID; END;