package com.bcxin.flink.cdc.kafka.source.task.jobs;

import com.bcxin.flink.cdc.kafka.source.task.JobContext;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogCdcValue;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogOffsetValue;
import com.bcxin.flink.cdc.kafka.source.task.compnents.WebHttpSinkWriter;
import com.bcxin.flink.cdc.kafka.source.task.proerpties.Constants;
import com.bcxin.flink.streaming.cores.JdbcJobExecutorUtil;
import com.bcxin.flink.streaming.cores.dtos.JdbcConnectionDto;
import java.sql.Connection;
import java.util.Properties;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

/* loaded from: input_file:com/bcxin/flink/cdc/kafka/source/task/jobs/DbStreamCdcHttpSinkJob.class */
public class DbStreamCdcHttpSinkJob extends DbStreamCdcJobAbstract {
    @Override // com.bcxin.flink.cdc.kafka.source.task.jobs.DbStreamCdcJobAbstract
    protected void executeCoreAction(StreamExecutionEnvironment streamExecutionEnvironment, SingleOutputStreamOperator<BinlogCdcValue> singleOutputStreamOperator, BinlogOffsetValue binlogOffsetValue) {
        final JobContext jobContext = JobContext.getInstance();
        final Properties properties = System.getProperties();
        final String property = System.getProperties().getProperty("datasource.driver-class-name");
        final String property2 = System.getProperties().getProperty("datasource.url");
        final String property3 = System.getProperties().getProperty("datasource.username");
        final String property4 = System.getProperties().getProperty("datasource.password");
        singleOutputStreamOperator.addSink(new RichSinkFunction<BinlogCdcValue>() { // from class: com.bcxin.flink.cdc.kafka.source.task.jobs.DbStreamCdcHttpSinkJob.1
            private WebHttpSinkWriter webHttpSinkWriter;
            private Connection connection;

            public void open(Configuration configuration) throws Exception {
                super.open(configuration);
                this.connection = JdbcJobExecutorUtil.getConnection(JdbcConnectionDto.create(property, property2, property3, property4));
                this.webHttpSinkWriter = new WebHttpSinkWriter(5000, 10000, jobContext.getHttpRegionSinkProperties(), properties.getProperty(Constants.KAFKA_CDC_TOPIC_PREFIX), this.connection);
            }

            public void close() throws Exception {
                if (this.connection != null && !this.connection.isClosed()) {
                    this.connection.close();
                }
                this.webHttpSinkWriter.close();
                super.close();
            }

            public void invoke(BinlogCdcValue binlogCdcValue, SinkFunction.Context context) throws Exception {
                this.webHttpSinkWriter.write(binlogCdcValue.getValue(), (SinkWriter.Context) null);
            }
        });
    }

    @Override // com.bcxin.flink.cdc.kafka.source.task.jobs.DbStreamCdcJobAbstract
    protected String getJobPrefixTitle() {
        return "2httpSink";
    }
}
