{ "name": "Consume kafka to build rd_company_summary-v2.5", "jobType": "STREAMING_COMPUTED", "description": "flink从kafka消费数据并汇总数据到rd_company_summary", "isOnline": true, "param1": { "job.name": "Consume kafka(tlk_companyinformation) to build rd_company_summary-2.5", "checkpoint.mode": "file", "checkpoint.disk.location": "/data/soft/flink/config/kms_team/cdc-kafka-cosumer", "checkpoint.hdfs.config": "core-site-default.xml" }, "param2": { "id": "job-kafka-jdbc-baibaodunflow-events-tlk_companyinformation", "name": "baibaodunflow-tlk_companyinformation-table", "description": "extract baibaodunflow from kafka to rd_company_summary summary table", "kafka2JdbcDefinitions": [ { "name":"消费tlk_companyinformation事件消息到rd_company_summary", "kafkaDefinition": { "name": "kafka..", "bootstrapServer": "43.155.113.170:9092", "topic": "baibaodunflow.binlog-cdc.topic.events-2.tlk_companyinformation", "consumerGroupId": "data.collector.group.tlk_companyinformation.binlog-cdc", "autoOffsetRest": "earliest", "scanStartupMode": "group-offsets", "enableAutoCommit": "true", "primaryKeyName": "ID" }, "jdbcDefinitions": [ { "url": "jdbc:mysql://49.4.21.141:45611/securityflow?characterEncoding=UTF-8&connectionTimeZone=GMT%2B8", "username": "bcx", "password": "Wstestv5qy#2022", "connectionTimeZone": "GMT-8", "driver": "com.mysql.cj.jdbc.Driver", "sqlTemplate": "INSERT INTO `securityflow`.`rd_company_summary`(`id`, `name`, `legal`, `registerAddress_name`, `registerAddress_detail`, `businessAddress_name`, `businessAddress_detail`, `service_scope`, `lonLat`, `status`, `serviceStationCount`, `activeSecurityManCount`, `last_sync_time`, `last_updated_time`) VALUES (:after.id, :after.ITEM_companyName, :after.ITEM_legalPerson, ifnull(:after.ITEM_registerAddress,''), ifnull(:after.ITEM_registerAddressDetail,''), ifnull(:after.ITEM_officeAddress,''), ifnull(:after.ITEM_officeAddressDetail,''), :after.ITEM_businessScope, '', (case when :after.ITEM_companyStatus='正常' then 1 else 0 end), 0, 0, CURRENT_TIMESTAMP, :after.LASTMODIFIED) ON DUPLICATE KEY UPDATE `name`=:after.ITEM_companyName,legal=:after.ITEM_legalPerson,registerAddress_name=ifnull(:after.ITEM_registerAddress,''),registerAddress_detail=ifnull(:after.ITEM_registerAddressDetail,''),businessAddress_name=ifnull(:after.ITEM_officeAddress,''),businessAddress_detail=ifnull(:after.ITEM_officeAddressDetail,''),service_scope=:after.ITEM_businessScope,status=(case when :after.ITEM_companyStatus='正常' then 1 else 0 end),last_sync_time=CURRENT_TIMESTAMP,last_updated_time=:after.LASTMODIFIED" } ] }, { "name":"消费tenant_employee事件消息到proc_rd_employee_summary", "kafkaDefinition": { "name": "消费kafka:obpm2.binlog-cdc.topic.events-2.tenant_employees", "bootstrapServer": "43.155.113.170:9092", "topic": "obpm2.binlog-cdc.topic.events-2.tenant_employees", "consumerGroupId": "data.collector.group.tenant_employees.binlog-cdc", "autoOffsetRest": "earliest", "scanStartupMode": "group-offsets", "enableAutoCommit": "true", "primaryKeyName": "ID" }, "jdbcDefinitions": [ { "name":"执行proc_rd_employee_summary存储过程: 同步人员", "url": "jdbc:mysql://49.4.21.141:45611/securityflow?characterEncoding=UTF-8&connectionTimeZone=GMT%2B8", "username": "bcx", "password": "Wstestv5qy#2022", "connectionTimeZone": "GMT-8", "driver": "com.mysql.cj.jdbc.Driver", "sqlTemplate": "call proc_rd_employee_summary(:after.id,:after.organization_id,:after.status,:after.organization_id,:after.last_sync_time)" } ] } ] } }