/*
 * Decompiled with CFR 0.152.
 */
package com.bcxin.flink.cdc.kafka.source.task.jobs;

import com.bcxin.event.core.FlinkJobAbstract;
import com.bcxin.event.job.core.domain.dtos.RedisConfig;
import com.bcxin.flink.cdc.kafka.source.task.JobContext;
import com.bcxin.flink.cdc.kafka.source.task.StartupOptionUtil;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogCdcValue;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogOffsetValue;
import com.bcxin.flink.cdc.kafka.source.task.compnents.BinlogCheckpointProcessFunction;
import com.bcxin.flink.cdc.kafka.source.task.compnents.JsonBinlogMetaDebeziumDeserializationSchema;
import com.bcxin.flink.cdc.kafka.source.task.proerpties.CdcDatabaseSourceProperty;
import com.bcxin.flink.streaming.cores.properties.CheckpointConfigProperty;
import com.bcxin.flink.streaming.cores.utils.StorageUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.kafka.connect.json.DecimalFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DbStreamCdcJobAbstract
extends FlinkJobAbstract {
    private static Logger logger = LoggerFactory.getLogger(DbStreamCdcJobAbstract.class);

    protected final SingleOutputStreamOperator<BinlogCdcValue> initDatabaseBinlog(StreamExecutionEnvironment env, BinlogOffsetValue binlogOffsetValue) {
        JobContext jobContext = JobContext.getInstance();
        CdcDatabaseSourceProperty databaseProperty = jobContext.getDatabaseProperty();
        Properties debeziumProperties = new Properties();
        debeziumProperties.put("snapshot.locking.mode", "none");
        debeziumProperties.put("database.history.store.only.monitored.tables.ddl", "false");
        String[] dbLists = (String[])Arrays.stream(databaseProperty.getDbList().split(";")).filter(ii -> !StringUtils.isEmpty((CharSequence)ii)).toArray(String[]::new);
        String[] tableList = (String[])Arrays.stream(databaseProperty.getTableList().split(";")).filter(ii -> !StringUtils.isEmpty((CharSequence)ii)).toArray(String[]::new);
        logger.info("\u5f53\u524d\u8bbe\u7f6e\u7684serverId\u4e3a:{}", (Object)databaseProperty.getServerId());
        logger.error("\u6355\u83b7\u7684DB binlog\u7684\u914d\u7f6e\u4fe1\u606f\u4e3a:{}-{}", (Object)jobContext.getDatabaseProperty().getHostName(), (Object)databaseProperty.getUserName());
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("decimal.format", DecimalFormat.NUMERIC.name());
        JsonBinlogMetaDebeziumDeserializationSchema jsonBinlogMetaDebeziumDeserializationSchema = new JsonBinlogMetaDebeziumDeserializationSchema(false, config);
        StartupOptions startupOptions = StartupOptionUtil.calculateStartupOption(binlogOffsetValue);
        logger.error("v3-\u5f53\u524d\u7684\u5f00\u59cb\u9009\u9879\u4e3a={};offset={}", (Object)startupOptions.startupMode, (Object)startupOptions.binlogOffset);
        MySqlSourceBuilder mySqlSourceBuilder = MySqlSource.builder().hostname(jobContext.getDatabaseProperty().getHostName()).port(jobContext.getDatabaseProperty().getPort()).databaseList(dbLists).tableList(tableList).username(databaseProperty.getUserName()).password(databaseProperty.getPassword()).fetchSize(1500).serverId(databaseProperty.getServerId()).startupOptions(startupOptions).includeSchemaChanges(false).connectionPoolSize(200).connectTimeout(Duration.of(databaseProperty.getConnectTimeout(), ChronoUnit.MILLIS)).deserializer((DebeziumDeserializationSchema)jsonBinlogMetaDebeziumDeserializationSchema).debeziumProperties(debeziumProperties);
        if (!StringUtils.isEmpty((CharSequence)databaseProperty.getConnectionTimeZone())) {
            mySqlSourceBuilder = mySqlSourceBuilder.serverTimeZone(databaseProperty.getConnectionTimeZone());
        }
        if (!StringUtils.isEmpty((CharSequence)databaseProperty.getServerId())) {
            mySqlSourceBuilder = mySqlSourceBuilder.serverId(databaseProperty.getServerId());
        }
        MySqlSource mySqlSource = mySqlSourceBuilder.build();
        RedisConfig redisConfig = RedisConfig.getDefaultFromMainThread();
        SingleOutputStreamOperator outputStreamOperator = env.fromSource((Source)mySqlSource, WatermarkStrategy.forBoundedOutOfOrderness((Duration)Duration.of(200L, ChronoUnit.MILLIS)), databaseProperty.getName()).setParallelism(4).keyBy((KeySelector & Serializable)ix -> ix.getFullTable()).process((KeyedProcessFunction)new BinlogCheckpointProcessFunction(redisConfig, binlogOffsetValue)).uid(String.format("binlog-cdc-mp-%s-%s", databaseProperty.getServerId(), databaseProperty.getJobId())).name(String.format("\u6570\u636e\u6e90:%s", databaseProperty.getName())).setParallelism(4);
        return outputStreamOperator;
    }

    protected void coreExecute() throws Exception {
        JobContext jobContext = JobContext.getInstance();
        Configuration configuration = new Configuration();
        configuration.set(BlobServerOptions.CLEANUP_INTERVAL, (Object)120L);
        configuration.set(BlobServerOptions.STORAGE_DIRECTORY, (Object)StorageUtil.getTmpPath());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
        rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
        env.setStateBackend((StateBackend)rocksDBStateBackend);
        CheckpointConfigProperty configProperty = jobContext.getConfigProperty();
        String checkpointPath = configProperty.getCheckpointPath();
        env.getCheckpointConfig().setCheckpointStorage(checkpointPath);
        logger.error("cdc.checkpoint\u7684PointStorage\u4f4d\u7f6e={};", (Object)configProperty.getCheckpointPath());
        env.enableCheckpointing(150000L);
        env.setRestartStrategy(RestartStrategies.fallBackRestart());
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
        BinlogOffsetValue binlogOffsetValue = StartupOptionUtil.getBinlogOffsetValue(RedisConfig.getDefaultFromMainThread(), jobContext.getDatabaseProperty());
        SingleOutputStreamOperator<BinlogCdcValue> dataStreamSource = this.initDatabaseBinlog(env, binlogOffsetValue);
        this.executeCoreAction(env, dataStreamSource, binlogOffsetValue);
        env.execute(String.format("flink-v10-%s-%s-%s", this.getJobPrefixTitle(), jobContext.getEnv(), jobContext.getName(), jobContext.getDatabaseProperty().getJobId()));
    }

    protected abstract void executeCoreAction(StreamExecutionEnvironment var1, SingleOutputStreamOperator<BinlogCdcValue> var2, BinlogOffsetValue var3);

    protected abstract String getJobPrefixTitle();
}

