{ "name": "实时归集企业-驻勤-驻勤人员-人员信息", "jdbcMeta": { "url": "jdbc:mysql://49.4.21.141:45611/dispatch?characterEncoding=UTF-8&connectionTimeZone=GMT%2B8&allowMultiQueries=true&autoReconnect=true", "userName": "bcx", "password": "Wstestv5qy#2022", "sqlMetaDefinitions": [ { "tableName": "rd_company_summary", "sql": "create table jdbc_insert_rd_company_summary(id STRING,status INT, name STRING,legal STRING,legal_telephone STRING,institutional STRING,register_address STRING,business_address STRING,service_scope STRING,lon_lat STRING,organization_id STRING,supervise_domain_id STRING,supervise_depart_id STRING, last_sync_time TIMESTAMP(3),last_updated_time TIMESTAMP(3),supervise_region_code STRING,PRIMARY KEY (id) NOT ENFORCED)" }, { "tableName": "rd_security_station_summary", "sql": "create table jdbc_insert_rd_security_station_summary(id STRING,name STRING,station_type STRING,company_name STRING,principal_id STRING,principal_name STRING,principal_contact STRING,service_scope STRING,address STRING,lon_lat STRING,organization_id STRING,supervise_domain_id STRING,supervise_depart_id STRING, last_sync_time TIMESTAMP(3),last_updated_time TIMESTAMP(3),site_state STRING,begin_date TIMESTAMP(3),end_date TIMESTAMP(3),supervise_region_code STRING,PRIMARY KEY (id) NOT ENFORCED)" }, { "tableName": "rd_employee_summary", "sql": "create table jdbc_insert_rd_employee_summary(id STRING,name STRING,head_photo STRING,contact STRING, company_name STRING,status INT,checked_status INT,authenticated_status INT,military_status STRING,security_certificate_no STRING,occupation_type INT,hired_date TIMESTAMP(3),leave_time TIMESTAMP(3),insure INT,organization_id STRING,tenant_user_id STRING,tenant_employee_id STRING,last_sync_time TIMESTAMP(3),last_updated_time TIMESTAMP(3),device_number STRING,tenant_im_user_id STRING,idCard_no STRING,cid STRING,PRIMARY KEY (id) NOT ENFORCED) " }, { "tableName": "rd_security_station_person_summary", "sql": "create table jdbc_insert_rd_security_station_person_summary(id STRING,employee_id STRING,security_station_id STRING,PRIMARY KEY (id) NOT ENFORCED)" } ] }, "kafkaMeta": { "autoOffsetRest": "earliest", "scanStartupMode": "earliest-offset", "enableAutoCommit": "true", "bootstrapServer": "117.78.39.204:9092", "topicDefinitions": [ { "topic": "obpm2.binlog-cdc.topic.tenant_organizations", "sql": "create table kafka_tenant_organizations(id STRING,name STRING,industry_code STRING,institutional_code STRING,place_of_business_address STRING,place_of_register_address STRING,latitude STRING,longitude STRING,PRIMARY KEY (id) NOT ENFORCED) " }, { "topic": "obpm2.binlog-cdc.topic.tenant_user_credentials", "sql": "create table kafka_tenant_user_credentials(id STRING,created_time BIGINT,address STRING,credential_type INT,front_photo STRING,head_photo STRING,name STRING,number STRING,reverse_photo STRING,selected INT,valid_date_from BIGINT,valid_date_to BIGINT,tenant_user_id STRING,mark STRING,district_code STRING,city_code STRING,province_code STRING, PRIMARY KEY (id) NOT ENFORCED) " }, { "topic": "obpm2.binlog-cdc.topic.tenant_users", "sql": "create table kafka_tenant_users(id STRING,authenticated_status INT,authenticated_result STRING,authenticated_time BIGINT,checked_status INT,third_party_loginNo STRING,created_time BIGINT ,email STRING,habit_setting STRING,head_photo STRING,im_identity STRING,lon_lat_json STRING,name STRING,sex INT,stature STRING,telephone STRING,user_type INT,selected_credential_id STRING,household_type STRING,one_inch_color_white_photo STRING,two_inch_color_blue_photo STRING,education STRING,marital_status STRING,military_status STRING,nation STRING,native_place STRING,politics_status STRING,cid STRING,wechat STRING,wechat_nicky STRING,last_updated_time BIGINT,background_screening_status INT,last_background_screening_time BIGINT,emergency_contact STRING,emergency_phone STRING,license_level STRING,place_of_now_address STRING,place_of_now_city_code STRING,place_of_now_city_name STRING,place_of_now_district_code STRING,place_of_now_district_name STRING,place_of_now_province_code STRING,place_of_now_province_name STRING,third_party_login_no STRING,last_sync_time BIGINT, PRIMARY KEY (id) NOT ENFORCED) " }, { "topic": "obpm2.binlog-cdc.topic.tenant_employees", "sql": "create table kafka_tenant_employees(id STRING,created_time BIGINT,email STRING,hired_date BIGINT,job_number STRING,last_updated_time BIGINT,leave_note STRING,leave_operator_created_time BIGINT,leave_operator_id STRING,leave_operator_name STRING,leave_time BIGINT,master_slave_type INT,occupation_type INT,`position` STRING,positive_date BIGINT,salary STRING,salary_bank_number STRING,status INT,work_place STRING,organization_id STRING,superior_id STRING,tenant_user_id STRING,unique_offset STRING,insure INT,is_domain_admin INT,identification STRING,interview STRING,person_status INT,plan_positive_date BIGINT,probation STRING,hired_operator_created_time BIGINT,hired_operator_id STRING,hired_operator_name STRING,last_sync_time BIGINT, PRIMARY KEY (id) NOT ENFORCED) " }, { "topic": "baibaodunflow.binlog-cdc.topic.tlk_companyinformation", "sql": "create table kafka_tlk_companyinformation(ID STRING,ITEM_companyStatus STRING, DOMAINID STRING, ITEM_legalPerson STRING,ITEM_legalPersonPhone STRING,ITEM_businessScope STRING,LASTMODIFIED BIGINT,PRIMARY KEY (ID) NOT ENFORCED) " }, { "topic": "baibaodunflow.binlog-cdc.topic.tlk_SetSupervise", "sql": "create table kafka_tlk_SetSupervise(ID STRING,ITEM_REGISTERADDRESS STRING,ITEM_REGISTERADDRESSID STRING,ITEM_REGISTERPOLICEADDRESS STRING,ITEM_OFFICEPOLICEADDRESSID STRING,ITEM_OFFICEPOLICEIDINDEX STRING,ITEM_REGISTERPOLICEADDRESSID STRING,ITEM_REGISTERPOLICEIDINDEX STRING,ITEM_DOMAIN_ID STRING,ITEM_B STRING,ITEM_FLAG INT, PRIMARY KEY (ID) NOT ENFORCED) " }, { "topic": "baibaodunflow.binlog-cdc.topic.tlk_attendance_site_base_info", "sql": "create table kafka_tlk_attendance_site_base_info(ID STRING,ITEM_OFFICEPOLICEADDRESSID STRING,ITEM_ATTENDANCESITENAME STRING,ITEM_ATTENDANCESITETYPE STRING,DOMAINID STRING,ITEM_PRINCIPAL STRING,ITEM_PRINCIPALNAME STRING,ITEM_PRINCIPALPHONENO STRING,ITEM_SERVEOBJECTNAME STRING,ITEM_ATTENDANCESITEADDRESS STRING,ITEM_ATTENDANCESITEFULLADDRESS STRING, ITEM_ATTENDANCESITELONGITUDE STRING,ITEM_ATTENDANCESITELATITUDE STRING,ITEM_ATTENDANCESTARTDATE BIGINT,ITEM_ATTENDANCEENDDATE BIGINT,ITEM_ATTENDANCESITESTATE STRING, LASTMODIFIED BIGINT, PRIMARY KEY (ID) NOT ENFORCED) " }, { "topic": "baibaodunflow.binlog-cdc.topic.tlk_attendance_site_person_info", "sql": "create table kafka_tlk_attendance_site_person_info(ID STRING, ITEM_ATTENDANCESITEID STRING,ITEM_SECURITYID STRING,ITEM_SECURITYNAME STRING,PRIMARY KEY (ID) NOT ENFORCED) " } ] }, "executeSql": [ "insert into jdbc_insert_rd_employee_summary(id,name,head_photo,contact, company_name,status,checked_status,authenticated_status,military_status,security_certificate_no,occupation_type,hired_date,leave_time,insure,organization_id,tenant_user_id,tenant_employee_id,last_sync_time,last_updated_time,device_number,tenant_im_user_id,idCard_no,cid)select distinct ee.id,u.`name`,c.head_photo,u.telephone,o.`name` as orgName,ee.`status`,u.checked_status,u.authenticated_status,u.military_status,(select c2.number from kafka_tenant_user_credentials c2 where c2.tenant_user_id=u.id and c2.credential_type=7 limit 1) as securityCredentialNo,ee.occupation_type,IFNULL(TO_TIMESTAMP(FROM_UNIXTIME(ee.hired_date/1000, 'yyyy-MM-dd HH:mm:ss')),CURRENT_TIMESTAMP) as hired_date,TO_TIMESTAMP(FROM_UNIXTIME(ee.leave_time/1000, 'yyyy-MM-dd HH:mm:ss')) as leave_time,ifnull(ee.insure,0),ee.organization_id,ee.tenant_user_id,ee.id,TO_TIMESTAMP(FROM_UNIXTIME(ee.last_sync_time/1000, 'yyyy-MM-dd HH:mm:ss')) as last_sync_time,TO_TIMESTAMP(FROM_UNIXTIME(ee.last_updated_time/1000, 'yyyy-MM-dd HH:mm:ss')) as last_updated_time,'',ifnull(u.im_identity,'#'), c.number,u.cid from kafka_tenant_users u left join kafka_tenant_user_credentials c on u.id=c.tenant_user_id and c.credential_type=0 join kafka_tenant_employees ee on ee.tenant_user_id=u.id join kafka_tenant_organizations o on o.id=ee.organization_id where ee.`status`=0", "insert into jdbc_insert_rd_company_summary(id,name,status,legal,legal_telephone,institutional,register_address, business_address,service_scope,organization_id,supervise_domain_id,supervise_depart_id,last_sync_time,last_updated_time,supervise_region_code,lon_lat)select o.id,o.`name`,(case when ITEM_companyStatus='注销' then 2 when ITEM_companyStatus='撤销' then 1 else 0 end) as ITEM_companyStatus,ci.ITEM_legalPerson,ci.ITEM_legalPersonPhone,ifnull(o.institutional_code,''),o.place_of_register_address,o.place_of_business_address,ci.ITEM_businessScope,o.id,'#','#supervise_depart_id',TO_TIMESTAMP(FROM_UNIXTIME(ci.LASTMODIFIED/1000, 'yyyy-MM-dd HH:mm:ss')),TO_TIMESTAMP(FROM_UNIXTIME(ci.LASTMODIFIED/1000, 'yyyy-MM-dd HH:mm:ss')),'110000', convert2Point(o.longitude,o.latitude) as lonlat from kafka_tenant_organizations o left join kafka_tlk_companyinformation ci on o.id=ci.DOMAINID", "insert into jdbc_insert_rd_security_station_summary(id,name,station_type,company_name,principal_id,principal_name, principal_contact,service_scope,address,lon_lat,organization_id,supervise_domain_id,supervise_depart_id,last_sync_time,last_updated_time,site_state,begin_date,end_date,supervise_region_code)select bi.ID,bi.ITEM_ATTENDANCESITENAME,bi.ITEM_ATTENDANCESITETYPE,o.name,bi.ITEM_PRINCIPAL,bi.ITEM_PRINCIPALNAME,bi.ITEM_PRINCIPALPHONENO,bi.ITEM_SERVEOBJECTNAME,bi.ITEM_ATTENDANCESITEFULLADDRESS, convert2Point(ITEM_ATTENDANCESITELONGITUDE,ITEM_ATTENDANCESITELATITUDE) as lonlat, o.id,'#',ifnull(bi.ITEM_OFFICEPOLICEADDRESSID,'#') as supervise_depart_id,TO_TIMESTAMP(FROM_UNIXTIME(bi.LASTMODIFIED/1000, 'yyyy-MM-dd HH:mm:ss')), TO_TIMESTAMP(FROM_UNIXTIME(bi.LASTMODIFIED/1000, 'yyyy-MM-dd HH:mm:ss')),bi.ITEM_ATTENDANCESITESTATE,TO_TIMESTAMP(FROM_UNIXTIME(bi.ITEM_ATTENDANCESTARTDATE/1000, 'yyyy-MM-dd HH:mm:ss')),TO_TIMESTAMP(FROM_UNIXTIME(bi.ITEM_ATTENDANCEENDDATE/1000, 'yyyy-MM-dd HH:mm:ss')),'110000' from kafka_tlk_attendance_site_base_info bi join kafka_tenant_organizations o on o.id=bi.DOMAINID ", "insert into jdbc_insert_rd_security_station_person_summary(id,employee_id,security_station_id)select ID,ITEM_SECURITYID,ITEM_ATTENDANCESITEID from kafka_tlk_attendance_site_person_info " ] }