{ "param1": "{\"job.name\":\"简化版本-CheckPoint-实时归集人员信息-3.0\",\"checkpoint.mode\":\"file\",\"checkpoint.hdfs.config\":\"core-site-default.xml\",\"checkpoint.disk.location\":\"/tmp/cp-table-employee-np-v3\"}", "param2": "{\n \"name\": \"实时归集人员信息\",\n \"jdbcMeta\": {\n \"url\": \"jdbc:mysql://192.168.0.7:3368/dispatch?characterEncoding=UTF-8&connectionTimeZone=GMT%2B8&allowMultiQueries=true&autoReconnect=true\",\n \"userName\": \"v5_enterprise2021\",\n \"password\": \"Prod_v5#202109\",\n \"sqlMetaDefinitions\": [\n {\n \"tableName\": \"rd_employee_summary\",\n \"sql\": \"create table jdbc_insert_rd_employee_summary(id STRING,supervise_region_code STRING,name STRING,head_photo STRING,contact STRING, company_name STRING,status INT,checked_status INT,authenticated_status INT,military_status STRING,security_certificate_no STRING,occupation_type INT,hired_date TIMESTAMP(3),leave_time TIMESTAMP(3),insure INT,organization_id STRING,tenant_user_id STRING,tenant_employee_id STRING,last_sync_time TIMESTAMP(3),last_updated_time TIMESTAMP(3),device_number STRING,tenant_im_user_id STRING,idCard_no STRING,cid STRING,PRIMARY KEY (id) NOT ENFORCED) \"\n }\n ]\n },\n \"kafkaMeta\": {\n \"autoOffsetRest\": \"earliest\",\n \"scanStartupMode\": \"earliest-offset\",\n \"enableAutoCommit\": \"true\",\n \"bootstrapServer\": \"192.168.0.52:9092\",\n \"topicDefinitions\": [\n {\n \"topic\": \"obpm2.binlog-cdc.topic.data.tenant_organizations\",\n \"sql\": \"create table kafka_tenant_organizations(id STRING,approved_information_status INT, name STRING,industry_code STRING,institutional_code STRING,place_of_business_address STRING,place_of_register_address STRING,latitude STRING,longitude STRING) \"\n },\n {\n \"topic\": \"obpm2.binlog-cdc.topic.data.tenant_user_credentials\",\n \"sql\": \"create table kafka_tenant_user_credentials(id STRING,`record_time` VARCHAR METADATA FROM 'timestamp',created_time BIGINT,address STRING,credential_type INT,front_photo STRING,head_photo STRING,name STRING,number STRING,reverse_photo STRING,selected INT,valid_date_from BIGINT,valid_date_to BIGINT,tenant_user_id STRING,mark STRING,district_code STRING,city_code STRING,province_code STRING, ts as record_time, WATERMARK FOR ts AS ts - INTERVAL '10' MINUTE) \"\n },\n {\n \"topic\": \"obpm2.binlog-cdc.topic.data.tenant_users\",\n \"sql\": \"create table kafka_tenant_users(id STRING,`record_time` VARCHAR METADATA FROM 'timestamp',authenticated_status INT,authenticated_result STRING,authenticated_time BIGINT,checked_status INT,third_party_loginNo STRING,created_time BIGINT ,email STRING,habit_setting STRING,head_photo STRING,im_identity STRING,lon_lat_json STRING,name STRING,sex INT,stature STRING,telephone STRING,user_type INT,selected_credential_id STRING,household_type STRING,one_inch_color_white_photo STRING,two_inch_color_blue_photo STRING,education STRING,marital_status STRING,military_status STRING,nation STRING,native_place STRING,politics_status STRING,cid STRING,wechat STRING,wechat_nicky STRING,last_updated_time BIGINT,background_screening_status INT,last_background_screening_time BIGINT,emergency_contact STRING,emergency_phone STRING,license_level STRING,place_of_now_address STRING,place_of_now_city_code STRING,place_of_now_city_name STRING,place_of_now_district_code STRING,place_of_now_district_name STRING,place_of_now_province_code STRING,place_of_now_province_name STRING,third_party_login_no STRING,last_sync_time BIGINT) \"\n },\n {\n \"topic\": \"obpm2.binlog-cdc.topic.data.tenant_employees\",\n \"sql\": \"create table kafka_tenant_employees(id STRING,created_time BIGINT,email STRING,hired_date BIGINT,job_number STRING,last_updated_time BIGINT,leave_note STRING,leave_operator_created_time BIGINT,leave_operator_id STRING,leave_operator_name STRING,leave_time BIGINT,master_slave_type INT,occupation_type INT,`position` STRING,positive_date BIGINT,salary STRING,salary_bank_number STRING,status INT,work_place STRING,organization_id STRING,superior_id STRING,tenant_user_id STRING,unique_offset STRING,insure INT,is_domain_admin INT,identification STRING,interview STRING,person_status INT,plan_positive_date BIGINT,probation STRING,hired_operator_created_time BIGINT,hired_operator_id STRING,hired_operator_name STRING,last_sync_time BIGINT) \"\n },\n {\n \"topic\": \"baibaodunflow.binlog-cdc.topic.data.tlk_attendance_site_base_info\",\n \"sql\": \"create table kafka_tlk_attendance_site_base_info(ID STRING,ITEM_officePoliceAddressID STRING,ITEM_attendanceSiteName STRING,ITEM_attendanceSiteType STRING,DOMAINID STRING,ITEM_principal STRING,ITEM_principalName STRING,ITEM_principalPhoneNo STRING,ITEM_serveObjectName STRING,ITEM_attendanceSiteAddress STRING,ITEM_attendanceSiteFullAddress STRING, ITEM_attendanceSiteLongitude STRING,ITEM_attendanceSiteLatitude STRING,ITEM_attendanceStartDate BIGINT,ITEM_attendanceEndDate BIGINT,ITEM_attendanceSiteState STRING, LASTMODIFIED BIGINT, ts as TO_TIMESTAMP(FROM_UNIXTIME(`LASTMODIFIED`/1000,'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '10' MINUTE) \"\n },\n {\n \"topic\": \"baibaodunflow.binlog-cdc.topic.data.tlk_attendance_site_person_info\",\n \"sql\": \"create table kafka_tlk_attendance_site_person_info(ID STRING, ITEM_attendanceSiteId STRING,ITEM_securityId STRING,ITEM_securityName STRING,LASTMODIFIED BIGINT, ts as TO_TIMESTAMP(FROM_UNIXTIME(`LASTMODIFIED`/1000,'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '10' MINUTE) \"\n }\n ]\n },\n \"executeSql\": [\n \"INSERT INTO jdbc_insert_rd_employee_summary (\\n id,\\n supervise_region_code,\\n name,\\n head_photo,\\n contact,\\n company_name,\\n status,\\n checked_status,\\n authenticated_status,\\n military_status,\\n security_certificate_no,\\n occupation_type,\\n hired_date,\\n leave_time,\\n insure,\\n organization_id,\\n tenant_user_id,\\n tenant_employee_id,\\n last_sync_time,\\n last_updated_time,\\n device_number,\\n tenant_im_user_id,\\n idCard_no,\\n cid\\n) SELECT\\n ee.id,\\n '#fk-t399',\\n u.`name`,\\n c.head_photo,\\n u.telephone,\\n o.`name` AS orgName,\\n ee.`status`,\\n u.checked_status,\\n u.authenticated_status,\\n u.military_status,(\\n d.number\\n ) AS securityCredentialNo,\\n\\tee.occupation_type,\\n\\tIFNULL( TO_TIMESTAMP ( FROM_UNIXTIME( ee.hired_date / 1000, 'yyyy-MM-dd HH:mm:ss' )), CURRENT_TIMESTAMP ) AS hired_date,\\n\\tTO_TIMESTAMP (\\n\\tFROM_UNIXTIME( ee.leave_time / 1000, 'yyyy-MM-dd HH:mm:ss' )) AS leave_time,\\n\\tifnull( ee.insure, 0 ),\\n\\tee.organization_id,\\n\\tee.tenant_user_id,\\n\\tee.id,\\n\\tTO_TIMESTAMP (\\n\\tFROM_UNIXTIME( ee.last_sync_time / 1000, 'yyyy-MM-dd HH:mm:ss' )) AS last_sync_time,\\n\\tTO_TIMESTAMP (\\n\\tFROM_UNIXTIME( ee.last_updated_time / 1000, 'yyyy-MM-dd HH:mm:ss' )) AS last_updated_time,\\n\\t'',\\n\\tifnull( u.im_identity, '#' ),\\n\\tc.number,\\n\\tu.cid\\nFROM\\n kafka_tenant_users u\\n JOIN kafka_tenant_user_credentials c ON c.id = u.selected_credential_id\\n JOIN kafka_tenant_employees ee ON ee.tenant_user_id = u.id\\n JOIN kafka_tenant_organizations o ON o.id = ee.organization_id\\n LEFT JOIN kafka_tenant_user_credentials d on d.tenant_user_id=u.id and d.credential_type = 7\\nWHERE\\n ee.`status` =0 and o.approved_information_status=1\"\n ]\n}" }