CREATE TABLE source_kafka_tenant_employees( id STRING,created_time BIGINT,tenant_user_id STRING,status SMALLINT,occupation_type SMALLINT, organization_id STRING,department_id STRING,hired_date BIGINT,leave_time BIGINT, PRIMARY KEY (id) NOT ENFORCED ) WITH ('connector'='kafka','scan.startup.mode'='group-offsets', 'properties.group.id'='flink-sql-consumer-jdbc-v3', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_employees-test', 'properties.bootstrap.servers' = '117.78.39.204:9092','properties.auto.offset.reset' = 'earliest', 'properties.enable.auto.commit' = 'true','format'='debezium-json'); CREATE TABLE source_kafka_tenant_users( id STRING,created_time BIGINT,name STRING,telephone STRING, selected_credential_id STRING,one_inch_color_white_photo STRING,checked_status SMALLINT,authenticated_status SMALLINT, PRIMARY KEY (id) NOT ENFORCED ) WITH ('connector'='kafka','scan.startup.mode'='group-offsets', 'properties.group.id'='flink-sql-consumer-jdbc-v3', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_users-test', 'properties.bootstrap.servers' = '117.78.39.204:9092','properties.auto.offset.reset' = 'earliest', 'properties.enable.auto.commit' = 'true','format'='debezium-json'); CREATE TABLE source_kafka_tenant_user_credentials( id STRING,created_time BIGINT,credential_type SMALLINT,front_photo STRING, head_photo STRING,name STRING,number STRING,tenant_user_id STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ('connector'='kafka','scan.startup.mode'='group-offsets', 'properties.group.id'='flink-sql-consumer-jdbc-v3', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_user_credentials-test', 'properties.bootstrap.servers' = '117.78.39.204:9092','properties.auto.offset.reset' = 'earliest', 'properties.enable.auto.commit' = 'true','format'='debezium-json'); CREATE TABLE source_kafka_tenant_organizations( id STRING,created_time BIGINT,name STRING,place_of_business_address STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ('connector'='kafka','scan.startup.mode'='group-offsets', 'properties.group.id'='flink-sql-consumer-jdbc-v3', 'topic' = 'obpm2.binlog-cdc.topic.v2.tenant_organizations-test', 'properties.bootstrap.servers' = '117.78.39.204:9092','properties.auto.offset.reset' = 'earliest', 'properties.enable.auto.commit' = 'true','format'='debezium-json'); CREATE TABLE source_kafka_tlk_attendance_site_base_info ( ID STRING,created_time BIGINT,ITEM_attendanceSiteName STRING,ITEM_attendanceStartDate BIGINT,ITEM_attendanceEndDate BIGINT, ITEM_attendanceSiteState STRING,DOMAINID STRING,ITEM_officePoliceAddressID STRING,ITEM_officePoliceAddress STRING, PRIMARY KEY (ID) NOT ENFORCED ) WITH ('connector'='kafka','scan.startup.mode'='group-offsets', 'properties.group.id'='flink-sql-consumer-jdbc-v3', 'topic' = 'baibaodunflow.binlog-cdc.topic.v2.tlk_attendance_site_base_info-test', 'properties.bootstrap.servers' = '117.78.39.204:9092','properties.auto.offset.reset' = 'earliest', 'properties.enable.auto.commit' = 'true','format'='debezium-json'); CREATE TABLE source_kafka_tlk_attendance_site_person_info ( ID STRING,created_time BIGINT,ITEM_securityId STRING,ITEM_attendanceSiteId STRING,ITEM_documentID STRING, DOMAINID STRING, PRIMARY KEY (ID) NOT ENFORCED ) WITH ('connector'='kafka','scan.startup.mode'='group-offsets', 'properties.group.id'='flink-sql-consumer-jdbc-v3', 'topic' = 'baibaodunflow.binlog-cdc.topic.v2.tlk_attendance_site_base_info-test', 'properties.bootstrap.servers' = '117.78.39.204:9092','properties.auto.offset.reset' = 'earliest', 'properties.enable.auto.commit' = 'true','format'='debezium-json'); CREATE TABLE sink_flink_tenant_organizations( id STRING, created_time BIGINT, name STRING, place_of_business_address STRING, numOfSecurityMen BIGINT, numOfSecurityStation BIGINT, PRIMARY KEY (id) NOT ENFORCED ) with( 'connector'='jdbc', 'url'='jdbc:mysql://114.115.253.155:16823/obpm2', 'driver'='com.mysql.cj.jdbc.Driver', 'username'='root', 'password'='Dxdf#0519.com', 'table-name'='stream_tenant_organizations','sink.parallelism'='1' ); CREATE TABLE sink_flink_tenant_organizations_join( id STRING, created_time BIGINT, name STRING, place_of_business_address STRING, numOfSecurityMen BIGINT, numOfSecurityStation BIGINT, PRIMARY KEY (id) NOT ENFORCED ) with( 'connector'='jdbc', 'url'='jdbc:mysql://114.115.253.155:16823/obpm2', 'driver'='com.mysql.cj.jdbc.Driver', 'username'='root', 'password'='Dxdf#0519.com', 'table-name'='stream_tenant_organizations_join','sink.parallelism'='1' ); create table stream_tenant_organizations (id varchar(255) not null primary key, created_time datetime not null default CURRENT_TIMESTAMP, `name` varchar(300) not null, place_of_business_address varchar(2000), numOfSecurityMen int not null default 0, numOfSecurityStation int not null default 0 ) create table stream_tenant_organizations_join (id varchar(255) not null primary key, created_time datetime not null default CURRENT_TIMESTAMP, `name` varchar(300) not null, place_of_business_address varchar(2000), numOfSecurityMen int not null default 0, numOfSecurityStation int not null default 0 );