{ "param1": "{\"job.name\":\"简化版本-CheckPoint-实时归集驻勤-驻勤人员信息-3.0\",\"checkpoint.mode\":\"file\",\"checkpoint.hdfs.config\":\"core-site-default.xml\",\"checkpoint.disk.location\":\"/tmp/cp-station-person\"}", "param2": "{\n \"name\": \"实时归集驻勤-驻勤关联人员信息\",\n \"jdbcMeta\": {\n \"url\": \"jdbc:mysql://192.168.0.7:3368/dispatch?characterEncoding=UTF-8&connectionTimeZone=GMT%2B8&allowMultiQueries=true&autoReconnect=true\",\n \"userName\": \"v5_enterprise2021\",\n \"password\": \"Prod_v5#202109\",\n \"sqlMetaDefinitions\": [\n {\n \"tableName\": \"rd_security_station_summary\",\n \"sql\": \"create table jdbc_insert_rd_security_station_summary(id STRING,name STRING,station_type STRING,company_name STRING,principal_id STRING,principal_name STRING,principal_contact STRING,service_scope STRING,address STRING,lon_lat STRING,organization_id STRING,supervise_domain_id STRING,supervise_depart_id STRING, last_sync_time TIMESTAMP(3),last_updated_time TIMESTAMP(3),site_state STRING,begin_date TIMESTAMP(3),end_date TIMESTAMP(3),supervise_region_code STRING,PRIMARY KEY (id) NOT ENFORCED)\"\n },\n {\n \"tableName\": \"rd_security_station_person_summary\",\n \"sql\": \"create table jdbc_insert_rd_security_station_person_summary(id STRING,employee_id STRING,security_station_id STRING,PRIMARY KEY (id) NOT ENFORCED)\"\n }\n ]\n },\n \"kafkaMeta\": {\n \"autoOffsetRest\": \"earliest\",\n \"scanStartupMode\": \"earliest-offset\",\n \"enableAutoCommit\": \"true\",\n \"bootstrapServer\": \"192.168.0.52:9092\",\n \"topicDefinitions\": [\n {\n \"topic\": \"obpm2.binlog-cdc.topic.tenant_organizations\",\n \"sql\": \"create table kafka_tenant_organizations(id STRING,name STRING,industry_code STRING,institutional_code STRING,place_of_business_address STRING,place_of_register_address STRING,latitude STRING,longitude STRING,PRIMARY KEY (id) NOT ENFORCED) \"\n },\n {\n \"topic\": \"baibaodunflow.binlog-cdc.topic.tlk_attendance_site_base_info\",\n \"sql\": \"create table kafka_tlk_attendance_site_base_info(ID STRING,ITEM_officePoliceAddressID STRING,ITEM_attendanceSiteName STRING,ITEM_attendanceSiteType STRING,DOMAINID STRING,ITEM_principal STRING,ITEM_principalName STRING,ITEM_principalPhoneNo STRING,ITEM_serveObjectName STRING,ITEM_attendanceSiteAddress STRING,ITEM_attendanceSiteFullAddress STRING, ITEM_attendanceSiteLongitude STRING,ITEM_attendanceSiteLatitude STRING,ITEM_attendanceStartDate BIGINT,ITEM_attendanceEndDate BIGINT,ITEM_attendanceSiteState STRING, LASTMODIFIED BIGINT, PRIMARY KEY (ID) NOT ENFORCED) \"\n },\n {\n \"topic\": \"baibaodunflow.binlog-cdc.topic.tlk_attendance_site_person_info\",\n \"sql\": \"create table kafka_tlk_attendance_site_person_info(ID STRING, ITEM_attendanceSiteId STRING,ITEM_securityId STRING,ITEM_SECURITYNAME STRING,PRIMARY KEY (ID) NOT ENFORCED) \"\n }\n ]\n },\n \"executeSql\": [\n \"insert into jdbc_insert_rd_security_station_summary(id,name,station_type,company_name,principal_id,principal_name, principal_contact,service_scope,address,lon_lat,organization_id,supervise_domain_id,supervise_depart_id,last_sync_time,last_updated_time,site_state,begin_date,end_date,supervise_region_code)select distinct bi.ID,bi.ITEM_attendanceSiteName,bi.ITEM_attendanceSiteType,o.name,bi.ITEM_principal,bi.ITEM_principalName,bi.ITEM_principalPhoneNo,bi.ITEM_serveObjectName,bi.ITEM_attendanceSiteFullAddress, convert2Point(ITEM_attendanceSiteLongitude,ITEM_attendanceSiteLatitude) as lonlat, o.id,'#',ifnull(bi.ITEM_officePoliceAddressID,'#') as supervise_depart_id,TO_TIMESTAMP(FROM_UNIXTIME(bi.LASTMODIFIED/1000, 'yyyy-MM-dd HH:mm:ss')), TO_TIMESTAMP(FROM_UNIXTIME(bi.LASTMODIFIED/1000, 'yyyy-MM-dd HH:mm:ss')),bi.ITEM_attendanceSiteState,TO_TIMESTAMP(FROM_UNIXTIME(bi.ITEM_attendanceStartDate/1000, 'yyyy-MM-dd HH:mm:ss')),TO_TIMESTAMP(FROM_UNIXTIME(bi.ITEM_attendanceEndDate/1000, 'yyyy-MM-dd HH:mm:ss')),'flink' from kafka_tlk_attendance_site_base_info bi join kafka_tenant_organizations o on o.id=bi.DOMAINID where ITEM_attendanceSiteState in ('生效','待完善')\",\n \"insert into jdbc_insert_rd_security_station_person_summary(id,employee_id,security_station_id)select distinct ID,ITEM_securityId,ITEM_attendanceSiteId from kafka_tlk_attendance_site_person_info \"\n ]\n}" }