INSERT INTO `obpm2`.`flink_jobs` (`id`, `name`, `job_type`, `parameter_json`, `description`, `is_online`, `last_updated_time`, `created_time`, `flink_job_id`, `flink_job_started_time`, `flink_job_started_response`, `title`, `is_deleted`) VALUES ('196', 'TableTask-实时归集驻勤及关联人员信息', '2', '{\r\n \"param1\": \"{\\\"job.name\\\":\\\"简化版本-CheckPoint-实时归集驻勤-驻勤人员信息-3.0\\\",\\\"checkpoint.mode\\\":\\\"file\\\",\\\"checkpoint.hdfs.config\\\":\\\"core-site-default.xml\\\",\\\"checkpoint.disk.location\\\":\\\"/tmp/cp-station-person\\\"}\",\r\n \"param2\": \"{\\n \\\"name\\\": \\\"实时归集驻勤-驻勤关联人员信息\\\",\\n \\\"jdbcMeta\\\": {\\n \\\"url\\\": \\\"jdbc:mysql://192.168.0.7:3368/dispatch?characterEncoding=UTF-8&connectionTimeZone=GMT%2B8&allowMultiQueries=true&autoReconnect=true\\\",\\n \\\"userName\\\": \\\"v5_enterprise2021\\\",\\n \\\"password\\\": \\\"Prod_v5#202109\\\",\\n \\\"sqlMetaDefinitions\\\": [\\n {\\n \\\"tableName\\\": \\\"rd_security_station_summary\\\",\\n \\\"sql\\\": \\\"create table jdbc_insert_rd_security_station_summary(id STRING,name STRING,station_type STRING,company_name STRING,principal_id STRING,principal_name STRING,principal_contact STRING,service_scope STRING,address STRING,lon_lat STRING,organization_id STRING,supervise_domain_id STRING,supervise_depart_id STRING, last_sync_time TIMESTAMP(3),last_updated_time TIMESTAMP(3),site_state STRING,begin_date TIMESTAMP(3),end_date TIMESTAMP(3),supervise_region_code STRING,PRIMARY KEY (id) NOT ENFORCED)\\\"\\n },\\n {\\n \\\"tableName\\\": \\\"rd_security_station_person_summary\\\",\\n \\\"sql\\\": \\\"create table jdbc_insert_rd_security_station_person_summary(id STRING,employee_id STRING,security_station_id STRING,PRIMARY KEY (id) NOT ENFORCED)\\\"\\n }\\n ]\\n },\\n \\\"kafkaMeta\\\": {\\n \\\"autoOffsetRest\\\": \\\"earliest\\\",\\n \\\"scanStartupMode\\\": \\\"earliest-offset\\\",\\n \\\"enableAutoCommit\\\": \\\"true\\\",\\n \\\"bootstrapServer\\\": \\\"192.168.0.52:9092\\\",\\n \\\"topicDefinitions\\\": [\\n {\\n \\\"topic\\\": \\\"obpm2.binlog-cdc.topic.tenant_organizations\\\",\\n \\\"sql\\\": \\\"create table kafka_tenant_organizations(id STRING,name STRING,industry_code STRING,institutional_code STRING,place_of_business_address STRING,place_of_register_address STRING,latitude STRING,longitude STRING,PRIMARY KEY (id) NOT ENFORCED) \\\"\\n },\\n {\\n \\\"topic\\\": \\\"baibaodunflow.binlog-cdc.topic.tlk_attendance_site_base_info\\\",\\n \\\"sql\\\": \\\"create table kafka_tlk_attendance_site_base_info(ID STRING,ITEM_officePoliceAddressID STRING,ITEM_attendanceSiteName STRING,ITEM_attendanceSiteType STRING,DOMAINID STRING,ITEM_principal STRING,ITEM_principalName STRING,ITEM_principalPhoneNo STRING,ITEM_serveObjectName STRING,ITEM_attendanceSiteAddress STRING,ITEM_attendanceSiteFullAddress STRING, ITEM_attendanceSiteLongitude STRING,ITEM_attendanceSiteLatitude STRING,ITEM_attendanceStartDate BIGINT,ITEM_attendanceEndDate BIGINT,ITEM_attendanceSiteState STRING, LASTMODIFIED BIGINT, PRIMARY KEY (ID) NOT ENFORCED) \\\"\\n },\\n {\\n \\\"topic\\\": \\\"baibaodunflow.binlog-cdc.topic.tlk_attendance_site_person_info\\\",\\n \\\"sql\\\": \\\"create table kafka_tlk_attendance_site_person_info(ID STRING, ITEM_attendanceSiteId STRING,ITEM_securityId STRING,ITEM_SECURITYNAME STRING,PRIMARY KEY (ID) NOT ENFORCED) \\\"\\n }\\n ]\\n },\\n \\\"executeSql\\\": [\\n \\\"insert into jdbc_insert_rd_security_station_summary(id,name,station_type,company_name,principal_id,principal_name, principal_contact,service_scope,address,lon_lat,organization_id,supervise_domain_id,supervise_depart_id,last_sync_time,last_updated_time,site_state,begin_date,end_date,supervise_region_code)select distinct bi.ID,bi.ITEM_attendanceSiteName,bi.ITEM_attendanceSiteType,o.name,bi.ITEM_principal,bi.ITEM_principalName,bi.ITEM_principalPhoneNo,bi.ITEM_serveObjectName,bi.ITEM_attendanceSiteFullAddress, convert2Point(ITEM_attendanceSiteLongitude,ITEM_attendanceSiteLatitude) as lonlat, o.id,\'#\',ifnull(bi.ITEM_officePoliceAddressID,\'#\') as supervise_depart_id,TO_TIMESTAMP(FROM_UNIXTIME(bi.LASTMODIFIED/1000, \'yyyy-MM-dd HH:mm:ss\')), TO_TIMESTAMP(FROM_UNIXTIME(bi.LASTMODIFIED/1000, \'yyyy-MM-dd HH:mm:ss\')),bi.ITEM_attendanceSiteState,TO_TIMESTAMP(FROM_UNIXTIME(bi.ITEM_attendanceStartDate/1000, \'yyyy-MM-dd HH:mm:ss\')),TO_TIMESTAMP(FROM_UNIXTIME(bi.ITEM_attendanceEndDate/1000, \'yyyy-MM-dd HH:mm:ss\')),\'flink\' from kafka_tlk_attendance_site_base_info bi join kafka_tenant_organizations o on o.id=bi.DOMAINID where ITEM_attendanceSiteState in (\'生效\',\'待完善\')\\\",\\n \\\"insert into jdbc_insert_rd_security_station_person_summary(id,employee_id,security_station_id)select distinct ID,ITEM_securityId,ITEM_attendanceSiteId from kafka_tlk_attendance_site_person_info \\\"\\n ]\\n}\"\r\n}', 'flink从obpm2+baibaodunflow的binlog捕获数据推到kafka', '1', '2023-04-24 14:28:48', '2023-03-05 20:21:59', NULL, NULL, NULL, 'xxxx', '0');