/*
 * Decompiled with CFR 0.152.
 */
package com.bcxin.flink.cdc.kafka.source.task.jobs;

import com.bcxin.flink.cdc.kafka.source.task.JobContext;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogCdcValue;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogOffsetValue;
import com.bcxin.flink.cdc.kafka.source.task.compnents.WebHttpSinkWriter;
import com.bcxin.flink.cdc.kafka.source.task.jobs.DbStreamCdcJobAbstract;
import com.bcxin.flink.streaming.cores.JdbcJobExecutorUtil;
import com.bcxin.flink.streaming.cores.dtos.JdbcConnectionDto;
import java.sql.Connection;
import java.util.Properties;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class DbStreamCdcHttpSinkJob
extends DbStreamCdcJobAbstract {
    @Override
    protected void executeCoreAction(StreamExecutionEnvironment env, SingleOutputStreamOperator<BinlogCdcValue> dataStreamSource, BinlogOffsetValue binlogOffsetValue) {
        final JobContext jobContext = JobContext.getInstance();
        final Properties properties = System.getProperties();
        final String driverClassName = System.getProperties().getProperty("datasource.driver-class-name");
        final String url = System.getProperties().getProperty("datasource.url");
        final String userName = System.getProperties().getProperty("datasource.username");
        final String password = System.getProperties().getProperty("datasource.password");
        dataStreamSource.addSink((SinkFunction)new RichSinkFunction<BinlogCdcValue>(){
            private WebHttpSinkWriter webHttpSinkWriter;
            private Connection connection;

            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                this.connection = JdbcJobExecutorUtil.getConnection((JdbcConnectionDto)JdbcConnectionDto.create((String)driverClassName, (String)url, (String)userName, (String)password));
                this.webHttpSinkWriter = new WebHttpSinkWriter(5000, 10000, jobContext.getHttpRegionSinkProperties(), properties.getProperty("kafka.cdc.topic.prefix"), this.connection);
            }

            public void close() throws Exception {
                if (this.connection != null && !this.connection.isClosed()) {
                    this.connection.close();
                }
                this.webHttpSinkWriter.close();
                super.close();
            }

            public void invoke(BinlogCdcValue value, SinkFunction.Context context) throws Exception {
                this.webHttpSinkWriter.write(value.getValue(), null);
            }
        });
    }

    @Override
    protected String getJobPrefixTitle() {
        return "2httpSink";
    }
}

