package com.bcxin.flink.cdc.kafka.source.task;

import com.bcxin.event.core.JobParameterDTO;
import com.bcxin.event.core.JsonProvider;
import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.event.core.enums.JobType;
import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.event.core.exceptions.IllegalArgumentEventException;
import com.bcxin.event.core.exceptions.NoFoundEventException;
import com.bcxin.flink.cdc.kafka.source.task.compnents.JdbcDbReaderComponent;
import com.bcxin.flink.cdc.kafka.source.task.jobs.DbStreamCdcHttpSinkJob;
import com.bcxin.flink.cdc.kafka.source.task.jobs.DbStreamCdcKafkaJob;
import com.bcxin.flink.cdc.kafka.source.task.proerpties.CdcDatabaseSourceProperty;
import com.bcxin.flink.cdc.kafka.source.task.proerpties.CheckpointConfigPropertyBuilder;
import com.bcxin.flink.cdc.kafka.source.task.proerpties.Constants;
import com.bcxin.flink.cdc.kafka.source.task.proerpties.HttpRegionSinkProperty;
import com.bcxin.flink.cdc.kafka.source.task.proerpties.KafkaConfigProperty;
import com.bcxin.flink.streaming.cores.JdbcJobExecutorUtil;
import com.bcxin.flink.streaming.cores.SystemPropertyUtil;
import com.bcxin.flink.streaming.cores.properties.CheckpointConfigProperty;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.flink.api.java.utils.ParameterTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/bcxin/flink/cdc/kafka/source/task/JdbcBinLogCaptureApp.class */
public class JdbcBinLogCaptureApp {
    private static final Logger logger = LoggerFactory.getLogger(JdbcBinLogCaptureApp.class);
    private static final JsonProvider jsonProvider = new JsonProviderImpl();
    private static JdbcDbReaderComponent dbReaderComponent = null;

    public static void main(String[] strArr) throws Exception {
        logger.info("接收参数准备执行CDC的BINLOG读取:{}", Arrays.stream(strArr).collect(Collectors.joining(";")));
        buildJobEnvironment(ParameterTool.fromArgs(strArr));
        (isHttpSink(strArr) ? new DbStreamCdcHttpSinkJob() : new DbStreamCdcKafkaJob(dbReaderComponent)).execute();
    }

    private static void buildJobEnvironment(ParameterTool parameterTool) throws IOException {
        String[] strArr = {parameterTool.get("env"), parameterTool.get("jobId"), parameterTool.get("mode")};
        Properties loadProperties = loadProperties(strArr);
        CheckpointConfigProperty build = CheckpointConfigPropertyBuilder.build(loadProperties, parameterTool);
        dbReaderComponent = JdbcDbReaderComponent.build(loadProperties);
        if (!isHttpSink(strArr)) {
            JobContext.initKafkaSink(loadProperties.getProperty(Constants.JOB_NAME), CdcDatabaseSourceProperty.create(loadProperties), build, KafkaConfigProperty.create(loadProperties.getProperty(Constants.KAFKA_BOOTSTRAP_SERVER), loadProperties.getProperty(Constants.KAFKA_CDC_TOPIC_PREFIX), loadProperties.getProperty(Constants.KAFKA_CONSUMER_GROUP_ID)));
            return;
        }
        ArrayList arrayList = new ArrayList();
        Enumeration keys = loadProperties.keys();
        while (keys.hasMoreElements()) {
            String str = (String) keys.nextElement();
            HttpRegionSinkProperty regionConfigProperty = HttpRegionSinkProperty.getRegionConfigProperty(str, loadProperties.getProperty(str));
            if (regionConfigProperty != null) {
                arrayList.add(regionConfigProperty);
            }
        }
        if (CollectionUtils.isEmpty(arrayList)) {
            throw new BadEventException("无效Http Region配置信息");
        }
        JobContext.initHttpSink(loadProperties.getProperty(Constants.JOB_NAME), arrayList, CdcDatabaseSourceProperty.create(loadProperties), build);
    }

    private static Properties loadProperties(String[] strArr) throws IOException {
        if (strArr == null || strArr.length < 2) {
            throw new BadEventException("无提供有效配置参数(环境以及jobId), 系统无法启动");
        }
        Properties loadEnvConf = SystemPropertyUtil.loadEnvConf(strArr[0]);
        int parseInt = Integer.parseInt(strArr[1]);
        JobParameterDTO jobParameter = JdbcJobExecutorUtil.getJobParameter(parseInt);
        if (jobParameter == null) {
            throw new NoFoundEventException("参数无效; 无法加载对应的job参数");
        }
        if (jobParameter.getJobType() != JobType.CDC_BINLOG) {
            throw new IllegalArgumentEventException(String.format("该jobType(%s)不支持BINLOG CDC处理", jobParameter.getJobType()));
        }
        String param1 = jobParameter.getParam1(jsonProvider);
        String param2 = jobParameter.getParam2(jsonProvider);
        try {
            Map map = (Map) jsonProvider.toObject(Map.class, param1);
            try {
                Map map2 = (Map) jsonProvider.toObject(Map.class, param2);
                loadEnvConf.setProperty("PARAM_JOB_ID_NAME", String.valueOf(parseInt));
                if (map != null) {
                    for (String str : map.keySet()) {
                        loadEnvConf.setProperty(str, (String) map.get(str));
                    }
                }
                if (map2 != null) {
                    for (String str2 : map2.keySet()) {
                        loadEnvConf.setProperty(str2, (String) map2.get(str2));
                    }
                }
                loadEnvConf.setProperty("PARAM_JOB_ID", String.valueOf(parseInt));
                if (!isHttpSink(strArr) && strArr.length > 2 && strArr[2] != null) {
                    loadEnvConf.setProperty("PARAM_BINLOG_SKIP_REDIS_CALCULATE", strArr[2]);
                    if (!"skip-redis".equalsIgnoreCase(strArr[2])) {
                        loadEnvConf.setProperty("PARAM_BINLOG_STARTUP_OPTION", strArr[2]);
                    }
                }
                return loadEnvConf;
            } catch (Exception e) {
                throw new BadEventException(String.format("转化异常:%s", param2), e);
            }
        } catch (Exception e2) {
            throw new BadEventException(String.format("转化异常:%s", param1), e2);
        }
    }

    private static boolean isHttpSink(String[] strArr) {
        return strArr.length > 2 && "http-sink".equalsIgnoreCase(strArr[2]);
    }
}
