INSERT INTO `obpm2`.`flink_jobs` (`id`, `name`, `job_type`, `parameter_json`, `description`, `is_online`, `last_updated_time`, `created_time`, `flink_job_id`, `flink_job_started_time`, `flink_job_started_response`, `title`, `is_deleted`) VALUES ('194', 'TableTask-实时归集企业信息', '2', '{\r\n \"param1\": \"{\\\"job.name\\\":\\\"简化版本-CheckPoint-实时归集企业人员信息-3.0\\\",\\\"checkpoint.mode\\\":\\\"file\\\",\\\"checkpoint.hdfs.config\\\":\\\"core-site-default.xml\\\",\\\"checkpoint.disk.location\\\":\\\"/tmp/cp-table-company-np\\\"}\",\r\n \"param2\": \"{\\n \\\"name\\\": \\\"实时归集企业-信息\\\",\\n \\\"jdbcMeta\\\": {\\n \\\"url\\\": \\\"jdbc:mysql://192.168.0.7:3368/dispatch?characterEncoding=UTF-8&connectionTimeZone=GMT%2B8&allowMultiQueries=true&autoReconnect=true\\\",\\n \\\"userName\\\": \\\"v5_enterprise2021\\\",\\n \\\"password\\\": \\\"Prod_v5#202109\\\",\\n \\\"sqlMetaDefinitions\\\": [\\n {\\n \\\"tableName\\\": \\\"rd_company_summary\\\",\\n \\\"sql\\\": \\\"create table jdbc_insert_rd_company_summary(id STRING,count_of_activity_station BIGINT,count_of_security_man BIGINT, status INT, name STRING,legal STRING,legal_telephone STRING,institutional STRING,register_address STRING,business_address STRING,service_scope STRING,lon_lat STRING,organization_id STRING,supervise_domain_id STRING,supervise_depart_id STRING, last_sync_time TIMESTAMP(3),last_updated_time TIMESTAMP(3),supervise_region_code STRING,PRIMARY KEY (id) NOT ENFORCED)\\\"\\n }\\n ]\\n },\\n \\\"kafkaMeta\\\": {\\n \\\"autoOffsetRest\\\": \\\"earliest\\\",\\n \\\"scanStartupMode\\\": \\\"earliest-offset\\\",\\n \\\"enableAutoCommit\\\": \\\"true\\\",\\n \\\"bootstrapServer\\\": \\\"192.168.0.52:9092\\\",\\n \\\"topicDefinitions\\\": [\\n {\\n \\\"topic\\\": \\\"obpm2.binlog-cdc.topic.data.tenant_organizations\\\",\\n \\\"sql\\\": \\\"create table kafka_tenant_organizations(id STRING,approved_information_status INT, name STRING,industry_code STRING,institutional_code STRING,place_of_business_address STRING,place_of_register_address STRING,latitude STRING,longitude STRING) \\\"\\n },\\n {\\n \\\"topic\\\": \\\"baibaodunflow.binlog-cdc.topic.data.tlk_companyinformation\\\",\\n \\\"sql\\\": \\\"create table kafka_tlk_companyinformation(ID STRING,ITEM_companyStatus STRING, DOMAINID STRING, ITEM_legalPerson STRING,ITEM_legalPersonPhone STRING,ITEM_businessScope STRING,LASTMODIFIED BIGINT) \\\"\\n },\\n {\\n \\\"topic\\\": \\\"obpm2.binlog-cdc.topic.data.tenant_employees\\\",\\n \\\"sql\\\": \\\"create table kafka_tenant_employees(id STRING,created_time BIGINT,email STRING,hired_date BIGINT,job_number STRING,last_updated_time BIGINT,leave_note STRING,leave_operator_created_time BIGINT,leave_operator_id STRING,leave_operator_name STRING,leave_time BIGINT,master_slave_type INT,occupation_type INT,`position` STRING,positive_date BIGINT,salary STRING,salary_bank_number STRING,status INT,work_place STRING,organization_id STRING,superior_id STRING,tenant_user_id STRING,unique_offset STRING,insure INT,is_domain_admin INT,identification STRING,interview STRING,person_status INT,plan_positive_date BIGINT,probation STRING,hired_operator_created_time BIGINT,hired_operator_id STRING,hired_operator_name STRING,last_sync_time BIGINT, ts as TO_TIMESTAMP(FROM_UNIXTIME(`created_time`/1000,\'yyyy-MM-dd HH:mm:ss\')), WATERMARK FOR ts AS ts - INTERVAL \'6\' HOUR) \\\"\\n },\\n {\\n \\\"topic\\\": \\\"baibaodunflow.binlog-cdc.topic.data.tlk_attendance_site_base_info\\\",\\n \\\"sql\\\": \\\"create table kafka_tlk_attendance_site_base_info(ID STRING,ITEM_officePoliceAddressID STRING,ITEM_attendanceSiteName STRING,ITEM_attendanceSiteType STRING,DOMAINID STRING,ITEM_principal STRING,ITEM_principalName STRING,ITEM_principalPhoneNo STRING,ITEM_serveObjectName STRING,ITEM_attendanceSiteAddress STRING,ITEM_attendanceSiteFullAddress STRING, ITEM_attendanceSiteLongitude STRING,ITEM_attendanceSiteLatitude STRING,ITEM_attendanceStartDate BIGINT,ITEM_attendanceEndDate BIGINT,ITEM_attendanceSiteState STRING, LASTMODIFIED BIGINT, ts as TO_TIMESTAMP(FROM_UNIXTIME(`LASTMODIFIED`/1000,\'yyyy-MM-dd HH:mm:ss\')), WATERMARK FOR ts AS ts - INTERVAL \'6\' HOUR) \\\"\\n }\\n ]\\n },\\n \\\"executeSql\\\": [\\n \\\"insert into jdbc_insert_rd_company_summary(id,count_of_activity_station,count_of_security_man,name,status,legal,legal_telephone,institutional,register_address, business_address,service_scope,organization_id,supervise_domain_id,supervise_depart_id,last_sync_time,last_updated_time,supervise_region_code,lon_lat)select o.id,(select count(1) from kafka_tlk_attendance_site_base_info k where k.DOMAINID=o.id and k.ITEM_attendanceSiteState in (\'生效\',\'待完善\')) as countOfSiteBaseInfo,(select count(1) from kafka_tenant_employees k where k.organization_id=o.id and k.occupation_type=1 and k.status=0) as countOfMen,o.`name`,(case when ITEM_companyStatus=\'注销\' then 2 when ITEM_companyStatus=\'撤销\' then 1 else 0 end) as ITEM_companyStatus,ci.ITEM_legalPerson,ci.ITEM_legalPersonPhone,ifnull(o.institutional_code,\'\'),o.place_of_register_address,o.place_of_business_address,ci.ITEM_businessScope,o.id,\'#\',\'#supervise_depart_id\',TO_TIMESTAMP(FROM_UNIXTIME(ci.LASTMODIFIED/1000, \'yyyy-MM-dd HH:mm:ss\')),TO_TIMESTAMP(FROM_UNIXTIME(ci.LASTMODIFIED/1000, \'yyyy-MM-dd HH:mm:ss\')),\'fk-no-ep\', convert2Point(o.longitude,o.latitude) as lonlat from kafka_tenant_organizations o left join kafka_tlk_companyinformation ci on o.id=ci.DOMAINID where o.approved_information_status=1 \\\"\\n ]\\n}\"\r\n}', 'flink从obpm2+baibaodunflow的binlog捕获数据推到kafka', '1', '2023-04-26 00:12:46', '2023-03-05 20:21:59', NULL, NULL, NULL, 'xxxx', '0');