{ "param1": "{\"job.name\":\"简化版本-CheckPoint-实时归集企业人员信息-3.0\",\"checkpoint.mode\":\"file\",\"checkpoint.hdfs.config\":\"core-site-default.xml\",\"checkpoint.disk.location\":\"/tmp/cp-table-company-np\"}", "param2": "{\n \"name\": \"实时归集企业-信息\",\n \"jdbcMeta\": {\n \"url\": \"jdbc:mysql://192.168.0.7:3368/dispatch?characterEncoding=UTF-8&connectionTimeZone=GMT%2B8&allowMultiQueries=true&autoReconnect=true\",\n \"userName\": \"v5_enterprise2021\",\n \"password\": \"Prod_v5#202109\",\n \"sqlMetaDefinitions\": [\n {\n \"tableName\": \"rd_company_summary\",\n \"sql\": \"create table jdbc_insert_rd_company_summary(id STRING,count_of_activity_station BIGINT,count_of_security_man BIGINT, status INT, name STRING,legal STRING,legal_telephone STRING,institutional STRING,register_address STRING,business_address STRING,service_scope STRING,lon_lat STRING,organization_id STRING,supervise_domain_id STRING,supervise_depart_id STRING, last_sync_time TIMESTAMP(3),last_updated_time TIMESTAMP(3),supervise_region_code STRING,PRIMARY KEY (id) NOT ENFORCED)\"\n }\n ]\n },\n \"kafkaMeta\": {\n \"autoOffsetRest\": \"earliest\",\n \"scanStartupMode\": \"earliest-offset\",\n \"enableAutoCommit\": \"true\",\n \"bootstrapServer\": \"192.168.0.52:9092\",\n \"topicDefinitions\": [\n {\n \"topic\": \"obpm2.binlog-cdc.topic.data.tenant_organizations\",\n \"sql\": \"create table kafka_tenant_organizations(id STRING,approved_information_status INT, name STRING,industry_code STRING,institutional_code STRING,place_of_business_address STRING,place_of_register_address STRING,latitude STRING,longitude STRING) \"\n },\n {\n \"topic\": \"baibaodunflow.binlog-cdc.topic.data.tlk_companyinformation\",\n \"sql\": \"create table kafka_tlk_companyinformation(ID STRING,ITEM_companyStatus STRING, DOMAINID STRING, ITEM_legalPerson STRING,ITEM_legalPersonPhone STRING,ITEM_businessScope STRING,LASTMODIFIED BIGINT) \"\n },\n {\n \"topic\": \"obpm2.binlog-cdc.topic.data.tenant_employees\",\n \"sql\": \"create table kafka_tenant_employees(id STRING,created_time BIGINT,email STRING,hired_date BIGINT,job_number STRING,last_updated_time BIGINT,leave_note STRING,leave_operator_created_time BIGINT,leave_operator_id STRING,leave_operator_name STRING,leave_time BIGINT,master_slave_type INT,occupation_type INT,`position` STRING,positive_date BIGINT,salary STRING,salary_bank_number STRING,status INT,work_place STRING,organization_id STRING,superior_id STRING,tenant_user_id STRING,unique_offset STRING,insure INT,is_domain_admin INT,identification STRING,interview STRING,person_status INT,plan_positive_date BIGINT,probation STRING,hired_operator_created_time BIGINT,hired_operator_id STRING,hired_operator_name STRING,last_sync_time BIGINT, ts as TO_TIMESTAMP(FROM_UNIXTIME(`created_time`/1000,'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '6' HOUR) \"\n },\n {\n \"topic\": \"baibaodunflow.binlog-cdc.topic.data.tlk_attendance_site_base_info\",\n \"sql\": \"create table kafka_tlk_attendance_site_base_info(ID STRING,ITEM_officePoliceAddressID STRING,ITEM_attendanceSiteName STRING,ITEM_attendanceSiteType STRING,DOMAINID STRING,ITEM_principal STRING,ITEM_principalName STRING,ITEM_principalPhoneNo STRING,ITEM_serveObjectName STRING,ITEM_attendanceSiteAddress STRING,ITEM_attendanceSiteFullAddress STRING, ITEM_attendanceSiteLongitude STRING,ITEM_attendanceSiteLatitude STRING,ITEM_attendanceStartDate BIGINT,ITEM_attendanceEndDate BIGINT,ITEM_attendanceSiteState STRING, LASTMODIFIED BIGINT, ts as TO_TIMESTAMP(FROM_UNIXTIME(`LASTMODIFIED`/1000,'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '6' HOUR) \"\n }\n ]\n },\n \"executeSql\": [\n \"insert into jdbc_insert_rd_company_summary(id,count_of_activity_station,count_of_security_man,name,status,legal,legal_telephone,institutional,register_address, business_address,service_scope,organization_id,supervise_domain_id,supervise_depart_id,last_sync_time,last_updated_time,supervise_region_code,lon_lat)select o.id,(select count(1) from kafka_tlk_attendance_site_base_info k where k.DOMAINID=o.id and k.ITEM_attendanceSiteState in ('生效','待完善')) as countOfSiteBaseInfo,(select count(1) from kafka_tenant_employees k where k.organization_id=o.id and k.occupation_type=1 and k.status=0) as countOfMen,o.`name`,(case when ITEM_companyStatus='注销' then 2 when ITEM_companyStatus='撤销' then 1 else 0 end) as ITEM_companyStatus,ci.ITEM_legalPerson,ci.ITEM_legalPersonPhone,ifnull(o.institutional_code,''),o.place_of_register_address,o.place_of_business_address,ci.ITEM_businessScope,o.id,'#','#supervise_depart_id',TO_TIMESTAMP(FROM_UNIXTIME(ci.LASTMODIFIED/1000, 'yyyy-MM-dd HH:mm:ss')),TO_TIMESTAMP(FROM_UNIXTIME(ci.LASTMODIFIED/1000, 'yyyy-MM-dd HH:mm:ss')),'fk-no-ep', convert2Point(o.longitude,o.latitude) as lonlat from kafka_tenant_organizations o left join kafka_tlk_companyinformation ci on o.id=ci.DOMAINID where o.approved_information_status=1 \"\n ]\n}" }