/*
 * Decompiled with CFR 0.152.
 */
package com.bcxin.tenant.data.etc.tasks.jobs;

import com.alibaba.fastjson.JSONObject;
import com.bcxin.event.core.FlinkConstants;
import com.bcxin.event.core.FlinkJobAbstract;
import com.bcxin.event.core.JsonProvider;
import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.event.core.exceptions.NoSupportEventException;
import com.bcxin.event.core.jdbc.JdbcNameParameterSqlParserImpl;
import com.bcxin.event.core.jdbc.ParseSqlParameter;
import com.bcxin.event.job.core.domain.dtos.RedisConfig;
import com.bcxin.flink.streaming.cores.properties.CheckpointConfigProperty;
import com.bcxin.tenant.data.etc.tasks.components.BinlogRawValue;
import com.bcxin.tenant.data.etc.tasks.components.CustomJdbcOutputFormat;
import com.bcxin.tenant.data.etc.tasks.components.CustomJdbcOutputFormatParameterWrapper;
import com.bcxin.tenant.data.etc.tasks.components.impls.CustomJdbcAcceptPreparedStatementParameterImpl;
import com.bcxin.tenant.data.etc.tasks.jobs.DataEtcJob;
import com.bcxin.tenant.data.etc.tasks.properties.DataEtcConfigProperty;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.PreparedStatementCreatorFactory;
import org.springframework.util.StringUtils;

public abstract class DataJobAbstract
extends FlinkJobAbstract {
    private static final Logger logger = LoggerFactory.getLogger(DataEtcJob.class);
    private final Set<String> _calculatedActionIdHash = Collections.synchronizedSet(new HashSet());
    private final Collection<DataEtcConfigProperty> configProperties;
    private final String configFile;
    private final boolean isDebug;
    private static Map<String, JdbcConnectionOptions> jdbcConnectionOptionsMap = new ConcurrentHashMap<String, JdbcConnectionOptions>();

    protected DataJobAbstract(Collection<DataEtcConfigProperty> configProperties, String configFile, boolean isDebug) {
        this.configProperties = configProperties;
        this.configFile = configFile;
        this.isDebug = isDebug;
    }

    protected void coreExecute() throws Exception {
        if (CollectionUtils.isEmpty(this.configProperties)) {
            throw new BadEventException("\u65e0\u6548\u914d\u7f6e\u6570\u636e; \u65e0\u6cd5\u542f\u52a8\u5f52\u96c6\u529f\u80fd");
        }
    }

    protected Collection<DataEtcConfigProperty.TmpMergedKafkaConnectionTopicInfo> getMergedKafkaConnections(DataEtcConfigProperty config) {
        if (config == null || CollectionUtils.isEmpty(config.getTopicSubscribers())) {
            throw new BadEventException("\u65e0\u6548\u4e3b\u9898\u914d\u7f6e\u6570\u636e; \u65e0\u6cd5\u542f\u52a8\u5f52\u96c6\u529f\u80fd");
        }
        return config.getTopicSubscribers().stream().map(subscriberConfigProperty -> {
            Optional<DataEtcConfigProperty.KafkaConnectionConfigProperty> kafkaConnectionConfigOptional = config.getKafkaConnections().stream().filter(ii -> ii.getName().equalsIgnoreCase(subscriberConfigProperty.getRefKafkaName())).findFirst();
            if (!kafkaConnectionConfigOptional.isPresent()) {
                throw new BadEventException(String.format("\u65e0\u6548refKafkaName\u914d\u7f6e(%s), \u65e0\u6cd5\u627e\u5230\u5bf9\u5e94\u7684\u914d\u7f6e", subscriberConfigProperty.getRefKafkaName()));
            }
            DataEtcConfigProperty.TmpMergedKafkaConnectionTopicInfo mergedKafkaConnectionTopicInfo = DataEtcConfigProperty.TmpMergedKafkaConnectionTopicInfo.create(kafkaConnectionConfigOptional.get());
            mergedKafkaConnectionTopicInfo.addTopic(subscriberConfigProperty.getTopic());
            return mergedKafkaConnectionTopicInfo;
        }).collect(Collectors.toList());
    }

    protected void buildJdbcSubscriber(String bootstrapServer, SingleOutputStreamOperator<BinlogRawValue> binlogRawValueKeyedStream, JdbcExecutionOptions jdbcExecutionOptions, DataEtcConfigProperty config, Collection<DataEtcConfigProperty.JdbcSubscriberContentConfigProperty> jdbcSubscriberContentConfigProperties, String sourceUid) {
        if (CollectionUtils.isEmpty(jdbcSubscriberContentConfigProperties)) {
            throw new BadEventException("\u65e0\u6548\u7684\u76ee\u6807\u8ba2\u9605\u914d\u7f6e");
        }
        jdbcSubscriberContentConfigProperties.stream().map(ix -> ix.getRefJdbcName()).distinct().forEach(jdbcName -> {
            Optional<DataEtcConfigProperty.JdbcConnectionConfigProperty> jdbcConnectionConfigPropertyOptional = config.getJdbcConnections().stream().filter(jdbcC -> jdbcC.getName().equalsIgnoreCase((String)jdbcName)).findFirst();
            if (!jdbcConnectionConfigPropertyOptional.isPresent()) {
                throw new BadEventException("\u627e\u4e0d\u5230\u6570\u636e\u6e90\u914d\u7f6e");
            }
            DataEtcConfigProperty.JdbcConnectionConfigProperty jdbcConnectionConfigProperty = jdbcConnectionConfigPropertyOptional.get();
            JdbcConnectionOptions jdbcConnectionOptions = this.getJdbcConnectionOption(jdbcConnectionConfigProperty);
            if (!CollectionUtils.isEmpty((Collection)jdbcSubscriberContentConfigProperties)) {
                Collection batchSql = jdbcSubscriberContentConfigProperties.stream().map(ix -> ix.getContent()).collect(Collectors.toList());
                batchSql.add(String.format("INSERT INTO `companyinfocollect`.`collect_logs`(`business_id`, `business_table_name`, `db_partition`, `last_sync_version`, `created_time`, `step`) VALUES (:%s, :%s,:%s,:%s, CURRENT_TIMESTAMP,'[[@step1@]]')", "id", "fullTable", "partition", "lastSyncVersion"));
                Collection batchOrConditionExpress = jdbcSubscriberContentConfigProperties.stream().filter(ix -> StringUtils.hasLength((String)ix.getConditionExpress())).map(ix -> ix.getConditionExpress()).collect(Collectors.toList());
                DataEtcConfigProperty.JdbcSubscriberContentConfigProperty scp = (DataEtcConfigProperty.JdbcSubscriberContentConfigProperty)jdbcSubscriberContentConfigProperties.stream().findFirst().get();
                SinkFunction<BinlogRawValue> sinkFunction = null;
                switch (scp.getType()) {
                    case JDBC: {
                        sinkFunction = this.sync2JdbcSink(bootstrapServer, batchSql, batchOrConditionExpress, scp, jdbcConnectionOptions, jdbcExecutionOptions);
                        break;
                    }
                    default: {
                        throw new NoSupportEventException("\u4e0d\u652f\u6301\u8be5\u5f52\u96c6\u7c7b\u578b");
                    }
                }
                String uidHash = String.format("%s_%s", sourceUid, scp.getUid());
                if (!this._calculatedActionIdHash.add(uidHash)) {
                    throw new BadEventException("\u8be5\u7b97\u5b50\u8282\u70b9\u5df2\u7ecf\u5b58\u50a8, \u8bf7\u786e\u4fdd\u540c\u4e00\u4e2a\u6587\u4ef6\u4e2d\u7684\u8be5\u6807\u9898\u4fe1\u606f\u662f\u552f\u4e00\u7684");
                }
                String operatorName = String.format("flink-\u7b97\u5b50-%s", scp.getTitle());
                if (operatorName.length() > 50) {
                    operatorName = operatorName.substring(0, 50);
                }
                operatorName = operatorName.concat(String.format("\u603b\u5171\u6267\u884c%s\u4e2a\u5b58\u50a8\u8fc7\u7a0b", batchSql.size()));
                KeyedStream keyedStream = binlogRawValueKeyedStream.keyBy((KeySelector & Serializable)ii -> {
                    JsonProviderImpl jsonProvider = new JsonProviderImpl();
                    String jsonValue = new String(ii.getValue());
                    JSONObject valueJsonObject = (JSONObject)jsonProvider.toObject(JSONObject.class, jsonValue);
                    JSONObject dataNode = valueJsonObject.getJSONObject("before");
                    if (dataNode == null) {
                        dataNode = valueJsonObject.getJSONObject("after");
                    }
                    String parallelismKeyValue = FlinkConstants.getExtractedParallelismOriginalKey((JSONObject)dataNode);
                    String pKey = FlinkConstants.getCalculatedParallelismKey((String)parallelismKeyValue);
                    return pKey;
                });
                keyedStream.addSink(sinkFunction).setParallelism(1).uid(String.format("sk:%s", uidHash)).name(operatorName);
            }
        });
    }

    protected JdbcConnectionOptions getJdbcConnectionOption(DataEtcConfigProperty.JdbcConnectionConfigProperty jdbcConnectionConfigProperty) {
        if (jdbcConnectionOptionsMap.containsKey(jdbcConnectionConfigProperty.getUrl())) {
            return jdbcConnectionOptionsMap.get(jdbcConnectionConfigProperty.getUrl());
        }
        JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbcConnectionConfigProperty.getUrl()).withDriverName(jdbcConnectionConfigProperty.getDriverClassName()).withUsername(jdbcConnectionConfigProperty.getUserName()).withPassword(jdbcConnectionConfigProperty.getPassword()).withConnectionCheckTimeoutSeconds(10).build();
        jdbcConnectionOptionsMap.put(jdbcConnectionConfigProperty.getUrl(), jdbcConnectionOptions);
        return jdbcConnectionOptionsMap.get(jdbcConnectionConfigProperty.getUrl());
    }

    protected SinkFunction<BinlogRawValue> sync2JdbcSink(String bootstrapServer, Collection<String> batchSql, Collection<String> batchOrConditionExpress, DataEtcConfigProperty.JdbcSubscriberContentConfigProperty scp, JdbcConnectionOptions jdbcConnectionOptions, JdbcExecutionOptions jdbcExecutionOptions) {
        String batchSqlContent = batchSql.stream().filter(ix -> StringUtils.hasLength((String)ix)).collect(Collectors.joining(";"));
        JdbcNameParameterSqlParserImpl nameParameterSqlParser = new JdbcNameParameterSqlParserImpl();
        ParseSqlParameter parse2 = nameParameterSqlParser.parse(batchSqlContent);
        List parameterIndexes = parse2.getParameterIndexes();
        List parameterNames = parse2.getParameterNames();
        PreparedStatementCreatorFactory pscf = parse2.getPscf();
        String sql = pscf.getSql();
        RedisConfig redisConfig = RedisConfig.getDefaultFromMainThread();
        CustomJdbcOutputFormatParameterWrapper jdbcOutputFormatParameterWrapper = CustomJdbcOutputFormatParameterWrapper.create(parameterNames, jdbcConnectionOptions, parameterIndexes, sql);
        JdbcOutputFormat.StatementExecutorFactory & Serializable statementExecutorFactory = (JdbcOutputFormat.StatementExecutorFactory & Serializable)context -> JdbcBatchStatementExecutor.simple((String)jdbcOutputFormatParameterWrapper.getSql(), (JdbcStatementBuilder & Serializable)(preparedStatement, ik) -> {}, Function.identity());
        GenericJdbcSinkFunction sinkFunction = new GenericJdbcSinkFunction((JdbcOutputFormat)new CustomJdbcOutputFormat(batchOrConditionExpress, jdbcConnectionOptions, jdbcExecutionOptions, statementExecutorFactory, sql, bootstrapServer, redisConfig, jdbcOutputFormatParameterWrapper, new CustomJdbcAcceptPreparedStatementParameterImpl(), JdbcOutputFormat.RecordExtractor.identity()));
        return sinkFunction;
    }

    protected StreamExecutionEnvironment getStreamExecutionEnvironment(CheckpointConfigProperty configProperty) {
        StreamExecutionEnvironment env = null;
        Configuration configuration = new Configuration();
        configuration.set(BlobServerOptions.CLEANUP_INTERVAL, (Object)120L);
        ParameterTool parameterTool = configProperty.getArgParameter();
        if (parameterTool != null) {
            List configOptions = Stream.of(JobManagerOptions.PORT, JobManagerOptions.TOTAL_PROCESS_MEMORY, TaskManagerOptions.TOTAL_PROCESS_MEMORY, TaskManagerOptions.NUM_TASK_SLOTS, TaskManagerOptions.TASK_OFF_HEAP_MEMORY, TaskManagerOptions.TASK_HEAP_MEMORY, TaskManagerOptions.JVM_METASPACE, TaskManagerOptions.MANAGED_MEMORY_SIZE, TaskManagerOptions.MANAGED_MEMORY_SIZE, RestOptions.PORT).collect(Collectors.toList());
            for (ConfigOption selectedConfigOption : configOptions) {
                String optionValue = parameterTool.get(selectedConfigOption.key());
                if (!StringUtils.hasLength((String)optionValue)) continue;
                configuration.set(selectedConfigOption, (Object)Integer.parseInt(optionValue));
            }
        }
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend((StateBackend)new EmbeddedRocksDBStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(configProperty.getCheckpointPath());
        logger.error("etc.checkpoint\u7684PointStorage\u4f4d\u7f6e={};", (Object)configProperty.getCheckpointPath());
        env.enableCheckpointing(600000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000L);
        env.getCheckpointConfig().setCheckpointTimeout(1600000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        env.getConfig().setAutoWatermarkInterval(60000L);
        return env;
    }

    protected KafkaRecordDeserializationSchema<BinlogRawValue> getDeserializationSchema() {
        KafkaDeserializationSchemaWrapper binlogRawValueKafkaDeserializationSchemaWrapper = new KafkaDeserializationSchemaWrapper((DeserializationSchema)new AbstractDeserializationSchema<BinlogRawValue>(){

            public BinlogRawValue deserialize(byte[] message) throws IOException {
                return BinlogRawValue.create(message, (JsonProvider)new JsonProviderImpl());
            }
        }){
            private Map<String, Long> topicOffsetMap;

            public void open(DeserializationSchema.InitializationContext context) throws Exception {
                super.open(context);
                this.topicOffsetMap = new HashMap<String, Long>();
            }
        };
        KafkaRecordDeserializationSchema kafkaRecordDeserializationSchema = KafkaRecordDeserializationSchema.of((KafkaDeserializationSchema)binlogRawValueKafkaDeserializationSchemaWrapper);
        return kafkaRecordDeserializationSchema;
    }
}

