package com.bcxin.flink.cdc.kafka.source.task.jobs;

import com.bcxin.event.core.FlinkJobAbstract;
import com.bcxin.event.job.core.domain.dtos.RedisConfig;
import com.bcxin.flink.cdc.kafka.source.task.JobContext;
import com.bcxin.flink.cdc.kafka.source.task.StartupOptionUtil;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogCdcValue;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogOffsetValue;
import com.bcxin.flink.cdc.kafka.source.task.compnents.BinlogCheckpointProcessFunction;
import com.bcxin.flink.cdc.kafka.source.task.compnents.JsonBinlogMetaDebeziumDeserializationSchema;
import com.bcxin.flink.cdc.kafka.source.task.proerpties.CdcDatabaseSourceProperty;
import com.bcxin.flink.streaming.cores.properties.CheckpointConfigProperty;
import com.bcxin.flink.streaming.cores.utils.StorageUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.connect.json.DecimalFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bcxin/flink/cdc/kafka/source/task/jobs/DbStreamCdcJobAbstract.class */
public abstract class DbStreamCdcJobAbstract extends FlinkJobAbstract {
    private static Logger logger = LoggerFactory.getLogger(DbStreamCdcJobAbstract.class);

    protected final SingleOutputStreamOperator<BinlogCdcValue> initDatabaseBinlog(StreamExecutionEnvironment streamExecutionEnvironment, BinlogOffsetValue binlogOffsetValue) {
        JobContext jobContext = JobContext.getInstance();
        CdcDatabaseSourceProperty databaseProperty = jobContext.getDatabaseProperty();
        Properties properties = new Properties();
        properties.put("snapshot.locking.mode", "none");
        properties.put("database.history.store.only.monitored.tables.ddl", "false");
        String[] strArr = (String[]) Arrays.stream(databaseProperty.getDbList().split(";")).filter(str -> {
            return !StringUtils.isEmpty(str);
        }).toArray(i -> {
            return new String[i];
        });
        String[] strArr2 = (String[]) Arrays.stream(databaseProperty.getTableList().split(";")).filter(str2 -> {
            return !StringUtils.isEmpty(str2);
        }).toArray(i2 -> {
            return new String[i2];
        });
        logger.info("当前设置的serverId为:{}", databaseProperty.getServerId());
        logger.error("捕获的DB binlog的配置信息为:{}-{}", jobContext.getDatabaseProperty().getHostName(), databaseProperty.getUserName());
        HashMap hashMap = new HashMap();
        hashMap.put("decimal.format", DecimalFormat.NUMERIC.name());
        JsonBinlogMetaDebeziumDeserializationSchema jsonBinlogMetaDebeziumDeserializationSchema = new JsonBinlogMetaDebeziumDeserializationSchema(false, hashMap);
        StartupOptions calculateStartupOption = StartupOptionUtil.calculateStartupOption(binlogOffsetValue);
        logger.error("v3-当前的开始选项为={};offset={}", calculateStartupOption.startupMode, calculateStartupOption.binlogOffset);
        MySqlSourceBuilder debeziumProperties = MySqlSource.builder().hostname(jobContext.getDatabaseProperty().getHostName()).port(jobContext.getDatabaseProperty().getPort()).databaseList(strArr).tableList(strArr2).username(databaseProperty.getUserName()).password(databaseProperty.getPassword()).fetchSize(1500).serverId(databaseProperty.getServerId()).startupOptions(calculateStartupOption).includeSchemaChanges(false).connectionPoolSize(200).connectTimeout(Duration.of(databaseProperty.getConnectTimeout(), ChronoUnit.MILLIS)).deserializer(jsonBinlogMetaDebeziumDeserializationSchema).debeziumProperties(properties);
        if (!StringUtils.isEmpty(databaseProperty.getConnectionTimeZone())) {
            debeziumProperties = debeziumProperties.serverTimeZone(databaseProperty.getConnectionTimeZone());
        }
        if (!StringUtils.isEmpty(databaseProperty.getServerId())) {
            debeziumProperties = debeziumProperties.serverId(databaseProperty.getServerId());
        }
        return streamExecutionEnvironment.fromSource(debeziumProperties.build(), WatermarkStrategy.forBoundedOutOfOrderness(Duration.of(200L, ChronoUnit.MILLIS)), databaseProperty.getName()).setParallelism(4).keyBy(binlogCdcValue -> {
            return binlogCdcValue.getFullTable();
        }).process(new BinlogCheckpointProcessFunction(RedisConfig.getDefaultFromMainThread(), binlogOffsetValue)).uid(String.format("binlog-cdc-mp-%s-%s", databaseProperty.getServerId(), databaseProperty.getJobId())).name(String.format("数据源:%s", databaseProperty.getName())).setParallelism(4);
    }

    protected void coreExecute() throws Exception {
        JobContext jobContext = JobContext.getInstance();
        Configuration configuration = new Configuration();
        configuration.set(BlobServerOptions.CLEANUP_INTERVAL, 120L);
        configuration.set(BlobServerOptions.STORAGE_DIRECTORY, StorageUtil.getTmpPath());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
        embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
        executionEnvironment.setStateBackend(embeddedRocksDBStateBackend);
        CheckpointConfigProperty configProperty = jobContext.getConfigProperty();
        executionEnvironment.getCheckpointConfig().setCheckpointStorage(configProperty.getCheckpointPath());
        logger.error("cdc.checkpoint的PointStorage位置={};", configProperty.getCheckpointPath());
        executionEnvironment.enableCheckpointing(150000L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fallBackRestart());
        executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000L);
        executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        executionEnvironment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        executionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
        BinlogOffsetValue binlogOffsetValue = StartupOptionUtil.getBinlogOffsetValue(RedisConfig.getDefaultFromMainThread(), jobContext.getDatabaseProperty());
        executeCoreAction(executionEnvironment, initDatabaseBinlog(executionEnvironment, binlogOffsetValue), binlogOffsetValue);
        executionEnvironment.execute(String.format("flink-v10-%s-%s-%s", getJobPrefixTitle(), jobContext.getEnv(), jobContext.getName(), jobContext.getDatabaseProperty().getJobId()));
    }

    protected abstract void executeCoreAction(StreamExecutionEnvironment streamExecutionEnvironment, SingleOutputStreamOperator<BinlogCdcValue> singleOutputStreamOperator, BinlogOffsetValue binlogOffsetValue);

    protected abstract String getJobPrefixTitle();

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1565130695:
                if (implMethodName.equals("lambda$initDatabaseBinlog$5ecec92e$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/bcxin/flink/cdc/kafka/source/task/jobs/DbStreamCdcJobAbstract") && serializedLambda.getImplMethodSignature().equals("(Lcom/bcxin/flink/cdc/kafka/source/task/cdcs/BinlogCdcValue;)Ljava/lang/String;")) {
                    return binlogCdcValue -> {
                        return binlogCdcValue.getFullTable();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
