INSERT INTO `obpm2`.`flink_jobs` (`id`, `name`, `job_type`, `parameter_json`, `description`, `is_online`, `last_updated_time`, `created_time`, `flink_job_id`, `flink_job_started_time`, `flink_job_started_response`, `title`, `is_deleted`) VALUES ('195', 'TableTask-实时归集人员信息', '2', '{\r\n \"param1\": \"{\\\"job.name\\\":\\\"简化版本-CheckPoint-实时归集人员信息-3.0\\\",\\\"checkpoint.mode\\\":\\\"file\\\",\\\"checkpoint.hdfs.config\\\":\\\"core-site-default.xml\\\",\\\"checkpoint.disk.location\\\":\\\"/tmp/cp-table-employee-np-198\\\"}\",\r\n \"param2\": \"{\\n \\\"name\\\": \\\"实时归集人员信息\\\",\\n \\\"jdbcMeta\\\": {\\n \\\"url\\\": \\\"jdbc:mysql://192.168.0.7:3368/dispatch?characterEncoding=UTF-8&connectionTimeZone=GMT%2B8&allowMultiQueries=true&autoReconnect=true\\\",\\n \\\"userName\\\": \\\"v5_enterprise2021\\\",\\n \\\"password\\\": \\\"Prod_v5#202109\\\",\\n \\\"sqlMetaDefinitions\\\": [\\n {\\n \\\"tableName\\\": \\\"rd_employee_summary\\\",\\n \\\"sql\\\": \\\"create table jdbc_insert_rd_employee_summary(id STRING,supervise_region_code STRING,name STRING,head_photo STRING,contact STRING, company_name STRING,status INT,checked_status INT,authenticated_status INT,military_status STRING,security_certificate_no STRING,occupation_type INT,hired_date TIMESTAMP(3),leave_time TIMESTAMP(3),insure INT,organization_id STRING,tenant_user_id STRING,tenant_employee_id STRING,last_sync_time TIMESTAMP(3),last_updated_time TIMESTAMP(3),device_number STRING,tenant_im_user_id STRING,idCard_no STRING,cid STRING,PRIMARY KEY (id) NOT ENFORCED) \\\"\\n }\\n ]\\n },\\n \\\"kafkaMeta\\\": {\\n \\\"autoOffsetRest\\\": \\\"earliest\\\",\\n \\\"scanStartupMode\\\": \\\"earliest-offset\\\",\\n \\\"enableAutoCommit\\\": \\\"true\\\",\\n \\\"bootstrapServer\\\": \\\"192.168.0.52:9092\\\",\\n \\\"topicDefinitions\\\": [\\n {\\n \\\"topic\\\": \\\"obpm2.binlog-cdc.topic.data.tenant_organizations\\\",\\n \\\"sql\\\": \\\"create table kafka_tenant_organizations(id STRING,approved_information_status INT, name STRING,industry_code STRING,institutional_code STRING,place_of_business_address STRING,place_of_register_address STRING,latitude STRING,longitude STRING) \\\"\\n },\\n {\\n \\\"topic\\\": \\\"obpm2.binlog-cdc.topic.data.tenant_user_credentials\\\",\\n \\\"sql\\\": \\\"create table kafka_tenant_user_credentials(id STRING,`record_time` VARCHAR METADATA FROM \'timestamp\',created_time BIGINT,address STRING,credential_type INT,front_photo STRING,head_photo STRING,name STRING,number STRING,reverse_photo STRING,selected INT,valid_date_from BIGINT,valid_date_to BIGINT,tenant_user_id STRING,mark STRING,district_code STRING,city_code STRING,province_code STRING, ts as record_time, WATERMARK FOR ts AS ts - INTERVAL \'10\' MINUTE) \\\"\\n },\\n {\\n \\\"topic\\\": \\\"obpm2.binlog-cdc.topic.data.tenant_users\\\",\\n \\\"sql\\\": \\\"create table kafka_tenant_users(id STRING,`record_time` VARCHAR METADATA FROM \'timestamp\',authenticated_status INT,authenticated_result STRING,authenticated_time BIGINT,checked_status INT,third_party_loginNo STRING,created_time BIGINT ,email STRING,habit_setting STRING,head_photo STRING,im_identity STRING,lon_lat_json STRING,name STRING,sex INT,stature STRING,telephone STRING,user_type INT,selected_credential_id STRING,household_type STRING,one_inch_color_white_photo STRING,two_inch_color_blue_photo STRING,education STRING,marital_status STRING,military_status STRING,nation STRING,native_place STRING,politics_status STRING,cid STRING,wechat STRING,wechat_nicky STRING,last_updated_time BIGINT,background_screening_status INT,last_background_screening_time BIGINT,emergency_contact STRING,emergency_phone STRING,license_level STRING,place_of_now_address STRING,place_of_now_city_code STRING,place_of_now_city_name STRING,place_of_now_district_code STRING,place_of_now_district_name STRING,place_of_now_province_code STRING,place_of_now_province_name STRING,third_party_login_no STRING,last_sync_time BIGINT) \\\"\\n },\\n {\\n \\\"topic\\\": \\\"obpm2.binlog-cdc.topic.data.tenant_employees\\\",\\n \\\"sql\\\": \\\"create table kafka_tenant_employees(id STRING,created_time BIGINT,email STRING,hired_date BIGINT,job_number STRING,last_updated_time BIGINT,leave_note STRING,leave_operator_created_time BIGINT,leave_operator_id STRING,leave_operator_name STRING,leave_time BIGINT,master_slave_type INT,occupation_type INT,`position` STRING,positive_date BIGINT,salary STRING,salary_bank_number STRING,status INT,work_place STRING,organization_id STRING,superior_id STRING,tenant_user_id STRING,unique_offset STRING,insure INT,is_domain_admin INT,identification STRING,interview STRING,person_status INT,plan_positive_date BIGINT,probation STRING,hired_operator_created_time BIGINT,hired_operator_id STRING,hired_operator_name STRING,last_sync_time BIGINT) \\\"\\n },\\n {\\n \\\"topic\\\": \\\"baibaodunflow.binlog-cdc.topic.data.tlk_attendance_site_base_info\\\",\\n \\\"sql\\\": \\\"create table kafka_tlk_attendance_site_base_info(ID STRING,ITEM_officePoliceAddressID STRING,ITEM_attendanceSiteName STRING,ITEM_attendanceSiteType STRING,DOMAINID STRING,ITEM_principal STRING,ITEM_principalName STRING,ITEM_principalPhoneNo STRING,ITEM_serveObjectName STRING,ITEM_attendanceSiteAddress STRING,ITEM_attendanceSiteFullAddress STRING, ITEM_attendanceSiteLongitude STRING,ITEM_attendanceSiteLatitude STRING,ITEM_attendanceStartDate BIGINT,ITEM_attendanceEndDate BIGINT,ITEM_attendanceSiteState STRING, LASTMODIFIED BIGINT, ts as TO_TIMESTAMP(FROM_UNIXTIME(`LASTMODIFIED`/1000,\'yyyy-MM-dd HH:mm:ss\')), WATERMARK FOR ts AS ts - INTERVAL \'10\' MINUTE) \\\"\\n },\\n {\\n \\\"topic\\\": \\\"baibaodunflow.binlog-cdc.topic.data.tlk_attendance_site_person_info\\\",\\n \\\"sql\\\": \\\"create table kafka_tlk_attendance_site_person_info(ID STRING, ITEM_attendanceSiteId STRING,ITEM_securityId STRING,ITEM_securityName STRING,LASTMODIFIED BIGINT, ts as TO_TIMESTAMP(FROM_UNIXTIME(`LASTMODIFIED`/1000,\'yyyy-MM-dd HH:mm:ss\')), WATERMARK FOR ts AS ts - INTERVAL \'10\' MINUTE) \\\"\\n }\\n ]\\n },\\n \\\"executeSql\\\": [\\n \\\"INSERT INTO jdbc_insert_rd_employee_summary (\\\\n id,\\\\n supervise_region_code,\\\\n name,\\\\n head_photo,\\\\n contact,\\\\n company_name,\\\\n status,\\\\n checked_status,\\\\n authenticated_status,\\\\n military_status,\\\\n security_certificate_no,\\\\n occupation_type,\\\\n hired_date,\\\\n leave_time,\\\\n insure,\\\\n organization_id,\\\\n tenant_user_id,\\\\n tenant_employee_id,\\\\n last_sync_time,\\\\n last_updated_time,\\\\n device_number,\\\\n tenant_im_user_id,\\\\n idCard_no,\\\\n cid\\\\n) SELECT\\\\n ee.id,\\\\n \'#fk-t299\',\\\\n u.`name`,\\\\n c.head_photo,\\\\n u.telephone,\\\\n o.`name` AS orgName,\\\\n ee.`status`,\\\\n u.checked_status,\\\\n u.authenticated_status,\\\\n u.military_status,(\\\\n d.number\\\\n ) AS securityCredentialNo,\\\\n\\\\tee.occupation_type,\\\\n\\\\tIFNULL( TO_TIMESTAMP ( FROM_UNIXTIME( ee.hired_date / 1000, \'yyyy-MM-dd HH:mm:ss\' )), CURRENT_TIMESTAMP ) AS hired_date,\\\\n\\\\tTO_TIMESTAMP (\\\\n\\\\tFROM_UNIXTIME( ee.leave_time / 1000, \'yyyy-MM-dd HH:mm:ss\' )) AS leave_time,\\\\n\\\\tifnull( ee.insure, 0 ),\\\\n\\\\tee.organization_id,\\\\n\\\\tee.tenant_user_id,\\\\n\\\\tee.id,\\\\n\\\\tTO_TIMESTAMP (\\\\n\\\\tFROM_UNIXTIME( ee.last_sync_time / 1000, \'yyyy-MM-dd HH:mm:ss\' )) AS last_sync_time,\\\\n\\\\tTO_TIMESTAMP (\\\\n\\\\tFROM_UNIXTIME( ee.last_updated_time / 1000, \'yyyy-MM-dd HH:mm:ss\' )) AS last_updated_time,\\\\n\\\\t\'\',\\\\n\\\\tifnull( u.im_identity, \'#\' ),\\\\n\\\\tc.number,\\\\n\\\\tu.cid\\\\nFROM\\\\n kafka_tenant_users u\\\\n JOIN kafka_tenant_user_credentials c ON c.id = u.selected_credential_id\\\\n JOIN kafka_tenant_employees ee ON ee.tenant_user_id = u.id\\\\n JOIN kafka_tenant_organizations o ON o.id = ee.organization_id\\\\n LEFT JOIN kafka_tenant_user_credentials d on d.tenant_user_id=u.id and d.credential_type = 7\\\\nWHERE\\\\n ee.`status` =0 and o.approved_information_status=1\\\"\\n ]\\n}\"\r\n}', 'flink从obpm2+baibaodunflow的binlog捕获数据推到kafka', '1', '2023-04-26 11:10:37', '2023-03-05 20:21:59', NULL, NULL, NULL, 'xxxx', '0');