{ "name": "实时归集人员信息", "jdbcMeta": { "url": "jdbc:mysql://192.168.0.7:3368/dispatch?characterEncoding=UTF-8&connectionTimeZone=GMT%2B8&allowMultiQueries=true&autoReconnect=true", "userName": "v5_enterprise2021", "password": "Prod_v5#202109", "sqlMetaDefinitions": [ { "tableName": "rd_employee_summary", "sql": "create table jdbc_insert_rd_employee_summary(id STRING,supervise_region_code STRING,name STRING,head_photo STRING,contact STRING, company_name STRING,status INT,checked_status INT,authenticated_status INT,military_status STRING,security_certificate_no STRING,occupation_type INT,hired_date TIMESTAMP(3),leave_time TIMESTAMP(3),insure INT,organization_id STRING,tenant_user_id STRING,tenant_employee_id STRING,last_sync_time TIMESTAMP(3),last_updated_time TIMESTAMP(3),device_number STRING,tenant_im_user_id STRING,idCard_no STRING,cid STRING,PRIMARY KEY (id) NOT ENFORCED) " } ] }, "kafkaMeta": { "autoOffsetRest": "earliest", "scanStartupMode": "earliest-offset", "enableAutoCommit": "true", "bootstrapServer": "192.168.0.52:9092", "topicDefinitions": [ { "topic": "obpm2.binlog-cdc.topic.data.tenant_organizations", "sql": "create table kafka_tenant_organizations(id STRING,approved_information_status INT, name STRING,industry_code STRING,institutional_code STRING,place_of_business_address STRING,place_of_register_address STRING,latitude STRING,longitude STRING) " }, { "topic": "obpm2.binlog-cdc.topic.data.tenant_user_credentials", "sql": "create table kafka_tenant_user_credentials(id STRING,`record_time` VARCHAR METADATA FROM 'timestamp',created_time BIGINT,address STRING,credential_type INT,front_photo STRING,head_photo STRING,name STRING,number STRING,reverse_photo STRING,selected INT,valid_date_from BIGINT,valid_date_to BIGINT,tenant_user_id STRING,mark STRING,district_code STRING,city_code STRING,province_code STRING, ts as record_time, WATERMARK FOR ts AS ts - INTERVAL '10' MINUTE) " }, { "topic": "obpm2.binlog-cdc.topic.data.tenant_users", "sql": "create table kafka_tenant_users(id STRING,`record_time` VARCHAR METADATA FROM 'timestamp',authenticated_status INT,authenticated_result STRING,authenticated_time BIGINT,checked_status INT,third_party_loginNo STRING,created_time BIGINT ,email STRING,habit_setting STRING,head_photo STRING,im_identity STRING,lon_lat_json STRING,name STRING,sex INT,stature STRING,telephone STRING,user_type INT,selected_credential_id STRING,household_type STRING,one_inch_color_white_photo STRING,two_inch_color_blue_photo STRING,education STRING,marital_status STRING,military_status STRING,nation STRING,native_place STRING,politics_status STRING,cid STRING,wechat STRING,wechat_nicky STRING,last_updated_time BIGINT,background_screening_status INT,last_background_screening_time BIGINT,emergency_contact STRING,emergency_phone STRING,license_level STRING,place_of_now_address STRING,place_of_now_city_code STRING,place_of_now_city_name STRING,place_of_now_district_code STRING,place_of_now_district_name STRING,place_of_now_province_code STRING,place_of_now_province_name STRING,third_party_login_no STRING,last_sync_time BIGINT) " }, { "topic": "obpm2.binlog-cdc.topic.data.tenant_employees", "sql": "create table kafka_tenant_employees(id STRING,created_time BIGINT,email STRING,hired_date BIGINT,job_number STRING,last_updated_time BIGINT,leave_note STRING,leave_operator_created_time BIGINT,leave_operator_id STRING,leave_operator_name STRING,leave_time BIGINT,master_slave_type INT,occupation_type INT,`position` STRING,positive_date BIGINT,salary STRING,salary_bank_number STRING,status INT,work_place STRING,organization_id STRING,superior_id STRING,tenant_user_id STRING,unique_offset STRING,insure INT,is_domain_admin INT,identification STRING,interview STRING,person_status INT,plan_positive_date BIGINT,probation STRING,hired_operator_created_time BIGINT,hired_operator_id STRING,hired_operator_name STRING,last_sync_time BIGINT) " }, { "topic": "baibaodunflow.binlog-cdc.topic.data.tlk_attendance_site_base_info", "sql": "create table kafka_tlk_attendance_site_base_info(ID STRING,ITEM_officePoliceAddressID STRING,ITEM_attendanceSiteName STRING,ITEM_attendanceSiteType STRING,DOMAINID STRING,ITEM_principal STRING,ITEM_principalName STRING,ITEM_principalPhoneNo STRING,ITEM_serveObjectName STRING,ITEM_attendanceSiteAddress STRING,ITEM_attendanceSiteFullAddress STRING, ITEM_attendanceSiteLongitude STRING,ITEM_attendanceSiteLatitude STRING,ITEM_attendanceStartDate BIGINT,ITEM_attendanceEndDate BIGINT,ITEM_attendanceSiteState STRING, LASTMODIFIED BIGINT, ts as TO_TIMESTAMP(FROM_UNIXTIME(`LASTMODIFIED`/1000,'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '10' MINUTE) " }, { "topic": "baibaodunflow.binlog-cdc.topic.data.tlk_attendance_site_person_info", "sql": "create table kafka_tlk_attendance_site_person_info(ID STRING, ITEM_attendanceSiteId STRING,ITEM_securityId STRING,ITEM_securityName STRING,LASTMODIFIED BIGINT, ts as TO_TIMESTAMP(FROM_UNIXTIME(`LASTMODIFIED`/1000,'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '10' MINUTE) " } ] }, "executeSql": [ "INSERT INTO jdbc_insert_rd_employee_summary (\n id,\n supervise_region_code,\n name,\n head_photo,\n contact,\n company_name,\n status,\n checked_status,\n authenticated_status,\n military_status,\n security_certificate_no,\n occupation_type,\n hired_date,\n leave_time,\n insure,\n organization_id,\n tenant_user_id,\n tenant_employee_id,\n last_sync_time,\n last_updated_time,\n device_number,\n tenant_im_user_id,\n idCard_no,\n cid\n) SELECT\n ee.id,\n '#fk-t399',\n u.`name`,\n c.head_photo,\n u.telephone,\n o.`name` AS orgName,\n ee.`status`,\n u.checked_status,\n u.authenticated_status,\n u.military_status,(\n d.number\n ) AS securityCredentialNo,\n\tee.occupation_type,\n\tIFNULL( TO_TIMESTAMP ( FROM_UNIXTIME( ee.hired_date / 1000, 'yyyy-MM-dd HH:mm:ss' )), CURRENT_TIMESTAMP ) AS hired_date,\n\tTO_TIMESTAMP (\n\tFROM_UNIXTIME( ee.leave_time / 1000, 'yyyy-MM-dd HH:mm:ss' )) AS leave_time,\n\tifnull( ee.insure, 0 ),\n\tee.organization_id,\n\tee.tenant_user_id,\n\tee.id,\n\tTO_TIMESTAMP (\n\tFROM_UNIXTIME( ee.last_sync_time / 1000, 'yyyy-MM-dd HH:mm:ss' )) AS last_sync_time,\n\tTO_TIMESTAMP (\n\tFROM_UNIXTIME( ee.last_updated_time / 1000, 'yyyy-MM-dd HH:mm:ss' )) AS last_updated_time,\n\t'',\n\tifnull( u.im_identity, '#' ),\n\tc.number,\n\tu.cid\nFROM\n kafka_tenant_users u\n JOIN kafka_tenant_user_credentials c ON c.id = u.selected_credential_id\n JOIN kafka_tenant_employees ee ON ee.tenant_user_id = u.id\n JOIN kafka_tenant_organizations o ON o.id = ee.organization_id\n LEFT JOIN kafka_tenant_user_credentials d on d.tenant_user_id=u.id and d.credential_type = 7\nWHERE\n ee.`status` =0 and o.approved_information_status=1" ] }