{ "name": "实时归集驻勤-驻勤关联人员信息", "jdbcMeta": { "url": "jdbc:mysql://192.168.0.7:3368/dispatch?characterEncoding=UTF-8&connectionTimeZone=GMT%2B8&allowMultiQueries=true&autoReconnect=true", "userName": "v5_enterprise2021", "password": "Prod_v5#202109", "sqlMetaDefinitions": [ { "tableName": "rd_security_station_summary", "sql": "create table jdbc_insert_rd_security_station_summary(id STRING,name STRING,station_type STRING,company_name STRING,principal_id STRING,principal_name STRING,principal_contact STRING,service_scope STRING,address STRING,lon_lat STRING,organization_id STRING,supervise_domain_id STRING,supervise_depart_id STRING, last_sync_time TIMESTAMP(3),last_updated_time TIMESTAMP(3),site_state STRING,begin_date TIMESTAMP(3),end_date TIMESTAMP(3),supervise_region_code STRING,PRIMARY KEY (id) NOT ENFORCED)" }, { "tableName": "rd_security_station_person_summary", "sql": "create table jdbc_insert_rd_security_station_person_summary(id STRING,employee_id STRING,security_station_id STRING,PRIMARY KEY (id) NOT ENFORCED)" } ] }, "kafkaMeta": { "autoOffsetRest": "earliest", "scanStartupMode": "earliest-offset", "enableAutoCommit": "true", "bootstrapServer": "192.168.0.52:9092", "topicDefinitions": [ { "topic": "obpm2.binlog-cdc.topic.tenant_organizations", "sql": "create table kafka_tenant_organizations(id STRING,name STRING,industry_code STRING,institutional_code STRING,place_of_business_address STRING,place_of_register_address STRING,latitude STRING,longitude STRING,PRIMARY KEY (id) NOT ENFORCED) " }, { "topic": "baibaodunflow.binlog-cdc.topic.tlk_attendance_site_base_info", "sql": "create table kafka_tlk_attendance_site_base_info(ID STRING,ITEM_officePoliceAddressID STRING,ITEM_attendanceSiteName STRING,ITEM_attendanceSiteType STRING,DOMAINID STRING,ITEM_principal STRING,ITEM_principalName STRING,ITEM_principalPhoneNo STRING,ITEM_serveObjectName STRING,ITEM_attendanceSiteAddress STRING,ITEM_attendanceSiteFullAddress STRING, ITEM_attendanceSiteLongitude STRING,ITEM_attendanceSiteLatitude STRING,ITEM_attendanceStartDate BIGINT,ITEM_attendanceEndDate BIGINT,ITEM_attendanceSiteState STRING, LASTMODIFIED BIGINT, PRIMARY KEY (ID) NOT ENFORCED) " }, { "topic": "baibaodunflow.binlog-cdc.topic.tlk_attendance_site_person_info", "sql": "create table kafka_tlk_attendance_site_person_info(ID STRING, ITEM_attendanceSiteId STRING,ITEM_securityId STRING,ITEM_SECURITYNAME STRING,PRIMARY KEY (ID) NOT ENFORCED) " } ] }, "executeSql": [ "insert into jdbc_insert_rd_security_station_summary(id,name,station_type,company_name,principal_id,principal_name, principal_contact,service_scope,address,lon_lat,organization_id,supervise_domain_id,supervise_depart_id,last_sync_time,last_updated_time,site_state,begin_date,end_date,supervise_region_code)select distinct bi.ID,bi.ITEM_attendanceSiteName,bi.ITEM_attendanceSiteType,o.name,bi.ITEM_principal,bi.ITEM_principalName,bi.ITEM_principalPhoneNo,bi.ITEM_serveObjectName,bi.ITEM_attendanceSiteFullAddress, convert2Point(ITEM_attendanceSiteLongitude,ITEM_attendanceSiteLatitude) as lonlat, o.id,'#',ifnull(bi.ITEM_officePoliceAddressID,'#') as supervise_depart_id,TO_TIMESTAMP(FROM_UNIXTIME(bi.LASTMODIFIED/1000, 'yyyy-MM-dd HH:mm:ss')), TO_TIMESTAMP(FROM_UNIXTIME(bi.LASTMODIFIED/1000, 'yyyy-MM-dd HH:mm:ss')),bi.ITEM_attendanceSiteState,TO_TIMESTAMP(FROM_UNIXTIME(bi.ITEM_attendanceStartDate/1000, 'yyyy-MM-dd HH:mm:ss')),TO_TIMESTAMP(FROM_UNIXTIME(bi.ITEM_attendanceEndDate/1000, 'yyyy-MM-dd HH:mm:ss')),'flink' from kafka_tlk_attendance_site_base_info bi join kafka_tenant_organizations o on o.id=bi.DOMAINID where ITEM_attendanceSiteState in ('生效','待完善')", "insert into jdbc_insert_rd_security_station_person_summary(id,employee_id,security_station_id)select distinct ID,ITEM_securityId,ITEM_attendanceSiteId from kafka_tlk_attendance_site_person_info " ] }