package com.bcxin.tenant.data.etc.tasks.jobs;

import com.alibaba.fastjson.JSONObject;
import com.bcxin.event.core.FlinkConstants;
import com.bcxin.event.core.FlinkJobAbstract;
import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.event.core.exceptions.NoSupportEventException;
import com.bcxin.event.core.jdbc.JdbcNameParameterSqlParserImpl;
import com.bcxin.event.core.jdbc.ParseSqlParameter;
import com.bcxin.event.job.core.domain.dtos.RedisConfig;
import com.bcxin.flink.streaming.cores.properties.CheckpointConfigProperty;
import com.bcxin.tenant.data.etc.tasks.components.BinlogRawValue;
import com.bcxin.tenant.data.etc.tasks.components.CustomJdbcOutputFormat;
import com.bcxin.tenant.data.etc.tasks.components.CustomJdbcOutputFormatParameterWrapper;
import com.bcxin.tenant.data.etc.tasks.components.impls.CustomJdbcAcceptPreparedStatementParameterImpl;
import com.bcxin.tenant.data.etc.tasks.properties.DataEtcConfigProperty;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/bcxin/tenant/data/etc/tasks/jobs/DataJobAbstract.class */
public abstract class DataJobAbstract extends FlinkJobAbstract {
    private final Set<String> _calculatedActionIdHash = Collections.synchronizedSet(new HashSet());
    private final Collection<DataEtcConfigProperty> configProperties;
    private final String configFile;
    private final boolean isDebug;
    private static final Logger logger = LoggerFactory.getLogger(DataEtcJob.class);
    private static Map<String, JdbcConnectionOptions> jdbcConnectionOptionsMap = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: protected */
    public DataJobAbstract(Collection<DataEtcConfigProperty> collection, String str, boolean z) {
        this.configProperties = collection;
        this.configFile = str;
        this.isDebug = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void coreExecute() throws Exception {
        if (CollectionUtils.isEmpty(this.configProperties)) {
            throw new BadEventException("无效配置数据; 无法启动归集功能");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collection<DataEtcConfigProperty.TmpMergedKafkaConnectionTopicInfo> getMergedKafkaConnections(DataEtcConfigProperty dataEtcConfigProperty) {
        if (dataEtcConfigProperty == null || CollectionUtils.isEmpty(dataEtcConfigProperty.getTopicSubscribers())) {
            throw new BadEventException("无效主题配置数据; 无法启动归集功能");
        }
        return (Collection) dataEtcConfigProperty.getTopicSubscribers().stream().map(topicSubscriberConfigProperty -> {
            Optional<DataEtcConfigProperty.KafkaConnectionConfigProperty> findFirst = dataEtcConfigProperty.getKafkaConnections().stream().filter(kafkaConnectionConfigProperty -> {
                return kafkaConnectionConfigProperty.getName().equalsIgnoreCase(topicSubscriberConfigProperty.getRefKafkaName());
            }).findFirst();
            if (!findFirst.isPresent()) {
                throw new BadEventException(String.format("无效refKafkaName配置(%s), 无法找到对应的配置", topicSubscriberConfigProperty.getRefKafkaName()));
            }
            DataEtcConfigProperty.TmpMergedKafkaConnectionTopicInfo create = DataEtcConfigProperty.TmpMergedKafkaConnectionTopicInfo.create(findFirst.get());
            create.addTopic(topicSubscriberConfigProperty.getTopic());
            return create;
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void buildJdbcSubscriber(String str, SingleOutputStreamOperator<BinlogRawValue> singleOutputStreamOperator, JdbcExecutionOptions jdbcExecutionOptions, DataEtcConfigProperty dataEtcConfigProperty, Collection<DataEtcConfigProperty.JdbcSubscriberContentConfigProperty> collection, String str2) {
        if (CollectionUtils.isEmpty(collection)) {
            throw new BadEventException("无效的目标订阅配置");
        }
        collection.stream().map(jdbcSubscriberContentConfigProperty -> {
            return jdbcSubscriberContentConfigProperty.getRefJdbcName();
        }).distinct().forEach(str3 -> {
            Optional<DataEtcConfigProperty.JdbcConnectionConfigProperty> findFirst = dataEtcConfigProperty.getJdbcConnections().stream().filter(jdbcConnectionConfigProperty -> {
                return jdbcConnectionConfigProperty.getName().equalsIgnoreCase(str3);
            }).findFirst();
            if (!findFirst.isPresent()) {
                throw new BadEventException("找不到数据源配置");
            }
            JdbcConnectionOptions jdbcConnectionOption = getJdbcConnectionOption(findFirst.get());
            if (CollectionUtils.isEmpty(collection)) {
                return;
            }
            Collection<String> collection2 = (Collection) collection.stream().map(jdbcSubscriberContentConfigProperty2 -> {
                return jdbcSubscriberContentConfigProperty2.getContent();
            }).collect(Collectors.toList());
            collection2.add(String.format("INSERT INTO `companyinfocollect`.`collect_logs`(`business_id`, `business_table_name`, `db_partition`, `last_sync_version`, `created_time`, `step`) VALUES (:%s, :%s,:%s,:%s, CURRENT_TIMESTAMP,'[[@step1@]]')", BinlogRawValue.FIELD_ID, BinlogRawValue.FIELD_FULL_TABLE, BinlogRawValue.FIELD_PARTITION, BinlogRawValue.FIELD_LAST_SYNC_VERSION));
            Collection<String> collection3 = (Collection) collection.stream().filter(jdbcSubscriberContentConfigProperty3 -> {
                return StringUtils.hasLength(jdbcSubscriberContentConfigProperty3.getConditionExpress());
            }).map(jdbcSubscriberContentConfigProperty4 -> {
                return jdbcSubscriberContentConfigProperty4.getConditionExpress();
            }).collect(Collectors.toList());
            DataEtcConfigProperty.JdbcSubscriberContentConfigProperty jdbcSubscriberContentConfigProperty5 = (DataEtcConfigProperty.JdbcSubscriberContentConfigProperty) collection.stream().findFirst().get();
            switch (jdbcSubscriberContentConfigProperty5.getType()) {
                case JDBC:
                    SinkFunction<BinlogRawValue> sync2JdbcSink = sync2JdbcSink(str, collection2, collection3, jdbcSubscriberContentConfigProperty5, jdbcConnectionOption, jdbcExecutionOptions);
                    String format = String.format("%s_%s", str2, jdbcSubscriberContentConfigProperty5.getUid());
                    if (!this._calculatedActionIdHash.add(format)) {
                        throw new BadEventException("该算子节点已经存储, 请确保同一个文件中的该标题信息是唯一的");
                    }
                    String format2 = String.format("flink-算子-%s", jdbcSubscriberContentConfigProperty5.getTitle());
                    if (format2.length() > 50) {
                        format2 = format2.substring(0, 50);
                    }
                    singleOutputStreamOperator.keyBy(binlogRawValue -> {
                        JSONObject jSONObject = (JSONObject) new JsonProviderImpl().toObject(JSONObject.class, new String(binlogRawValue.getValue()));
                        JSONObject jSONObject2 = jSONObject.getJSONObject("before");
                        if (jSONObject2 == null) {
                            jSONObject2 = jSONObject.getJSONObject("after");
                        }
                        return FlinkConstants.getCalculatedParallelismKey(FlinkConstants.getExtractedParallelismOriginalKey(jSONObject2));
                    }).addSink(sync2JdbcSink).setParallelism(1).uid(String.format("sk:%s", format)).name(format2.concat(String.format("总共执行%s个存储过程", Integer.valueOf(collection2.size()))));
                    return;
                default:
                    throw new NoSupportEventException("不支持该归集类型");
            }
        });
    }

    protected JdbcConnectionOptions getJdbcConnectionOption(DataEtcConfigProperty.JdbcConnectionConfigProperty jdbcConnectionConfigProperty) {
        if (jdbcConnectionOptionsMap.containsKey(jdbcConnectionConfigProperty.getUrl())) {
            return jdbcConnectionOptionsMap.get(jdbcConnectionConfigProperty.getUrl());
        }
        jdbcConnectionOptionsMap.put(jdbcConnectionConfigProperty.getUrl(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbcConnectionConfigProperty.getUrl()).withDriverName(jdbcConnectionConfigProperty.getDriverClassName()).withUsername(jdbcConnectionConfigProperty.getUserName()).withPassword(jdbcConnectionConfigProperty.getPassword()).withConnectionCheckTimeoutSeconds(10).build());
        return jdbcConnectionOptionsMap.get(jdbcConnectionConfigProperty.getUrl());
    }

    protected SinkFunction<BinlogRawValue> sync2JdbcSink(String str, Collection<String> collection, Collection<String> collection2, DataEtcConfigProperty.JdbcSubscriberContentConfigProperty jdbcSubscriberContentConfigProperty, JdbcConnectionOptions jdbcConnectionOptions, JdbcExecutionOptions jdbcExecutionOptions) {
        ParseSqlParameter parse = new JdbcNameParameterSqlParserImpl().parse((String) collection.stream().filter(str2 -> {
            return StringUtils.hasLength(str2);
        }).collect(Collectors.joining(";")));
        List parameterIndexes = parse.getParameterIndexes();
        List parameterNames = parse.getParameterNames();
        String sql = parse.getPscf().getSql();
        RedisConfig defaultFromMainThread = RedisConfig.getDefaultFromMainThread();
        CustomJdbcOutputFormatParameterWrapper create = CustomJdbcOutputFormatParameterWrapper.create(parameterNames, jdbcConnectionOptions, parameterIndexes, sql);
        return new GenericJdbcSinkFunction(new CustomJdbcOutputFormat(collection2, jdbcConnectionOptions, jdbcExecutionOptions, obj -> {
            return JdbcBatchStatementExecutor.simple(create.getSql(), (preparedStatement, obj) -> {
            }, Function.identity());
        }, sql, str, defaultFromMainThread, create, new CustomJdbcAcceptPreparedStatementParameterImpl(), JdbcOutputFormat.RecordExtractor.identity()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamExecutionEnvironment getStreamExecutionEnvironment(CheckpointConfigProperty checkpointConfigProperty) {
        Configuration configuration = new Configuration();
        configuration.set(BlobServerOptions.CLEANUP_INTERVAL, 120L);
        ParameterTool argParameter = checkpointConfigProperty.getArgParameter();
        if (argParameter != null) {
            for (ConfigOption configOption : (List) Stream.of((Object[]) new ConfigOption[]{JobManagerOptions.PORT, JobManagerOptions.TOTAL_PROCESS_MEMORY, TaskManagerOptions.TOTAL_PROCESS_MEMORY, TaskManagerOptions.NUM_TASK_SLOTS, TaskManagerOptions.TASK_OFF_HEAP_MEMORY, TaskManagerOptions.TASK_HEAP_MEMORY, TaskManagerOptions.JVM_METASPACE, TaskManagerOptions.MANAGED_MEMORY_SIZE, TaskManagerOptions.MANAGED_MEMORY_SIZE, RestOptions.PORT}).collect(Collectors.toList())) {
                String str = argParameter.get(configOption.key());
                if (StringUtils.hasLength(str)) {
                    configuration.set(configOption, Integer.valueOf(Integer.parseInt(str)));
                }
            }
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(new EmbeddedRocksDBStateBackend());
        executionEnvironment.getCheckpointConfig().setCheckpointStorage(checkpointConfigProperty.getCheckpointPath());
        logger.error("etc.checkpoint的PointStorage位置={};", checkpointConfigProperty.getCheckpointPath());
        executionEnvironment.enableCheckpointing(600000L);
        executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000L);
        executionEnvironment.getCheckpointConfig().setCheckpointTimeout(1600000L);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        executionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        executionEnvironment.getConfig().setAutoWatermarkInterval(60000L);
        return executionEnvironment;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaRecordDeserializationSchema<BinlogRawValue> getDeserializationSchema() {
        return KafkaRecordDeserializationSchema.of(new KafkaDeserializationSchemaWrapper(new AbstractDeserializationSchema<BinlogRawValue>() { // from class: com.bcxin.tenant.data.etc.tasks.jobs.DataJobAbstract.1
            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public BinlogRawValue m14deserialize(byte[] bArr) throws IOException {
                return BinlogRawValue.create(bArr, new JsonProviderImpl());
            }
        }) { // from class: com.bcxin.tenant.data.etc.tasks.jobs.DataJobAbstract.2
            private Map<String, Long> topicOffsetMap;

            public void open(DeserializationSchema.InitializationContext initializationContext) throws Exception {
                super.open(initializationContext);
                this.topicOffsetMap = new HashMap();
            }
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1191674297:
                if (implMethodName.equals("lambda$buildJdbcSubscriber$dc16929f$1")) {
                    z = 2;
                    break;
                }
                break;
            case 791917749:
                if (implMethodName.equals("lambda$sync2JdbcSink$3e52fd65$1")) {
                    z = false;
                    break;
                }
                break;
            case 1595900370:
                if (implMethodName.equals("lambda$sync2JdbcSink$96c7e706$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/jdbc/JdbcStatementBuilder") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/bcxin/tenant/data/etc/tasks/jobs/DataJobAbstract") && serializedLambda.getImplMethodSignature().equals("(Ljava/sql/PreparedStatement;Ljava/lang/Object;)V")) {
                    return (preparedStatement, obj) -> {
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/jdbc/internal/JdbcOutputFormat$StatementExecutorFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/bcxin/tenant/data/etc/tasks/jobs/DataJobAbstract") && serializedLambda.getImplMethodSignature().equals("(Lcom/bcxin/tenant/data/etc/tasks/components/CustomJdbcOutputFormatParameterWrapper;Ljava/lang/Object;)Ljava/lang/Object;")) {
                    CustomJdbcOutputFormatParameterWrapper customJdbcOutputFormatParameterWrapper = (CustomJdbcOutputFormatParameterWrapper) serializedLambda.getCapturedArg(0);
                    return obj2 -> {
                        return JdbcBatchStatementExecutor.simple(customJdbcOutputFormatParameterWrapper.getSql(), (preparedStatement2, obj2) -> {
                        }, Function.identity());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/bcxin/tenant/data/etc/tasks/jobs/DataJobAbstract") && serializedLambda.getImplMethodSignature().equals("(Lcom/bcxin/tenant/data/etc/tasks/components/BinlogRawValue;)Ljava/lang/String;")) {
                    return binlogRawValue -> {
                        JSONObject jSONObject = (JSONObject) new JsonProviderImpl().toObject(JSONObject.class, new String(binlogRawValue.getValue()));
                        JSONObject jSONObject2 = jSONObject.getJSONObject("before");
                        if (jSONObject2 == null) {
                            jSONObject2 = jSONObject.getJSONObject("after");
                        }
                        return FlinkConstants.getCalculatedParallelismKey(FlinkConstants.getExtractedParallelismOriginalKey(jSONObject2));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
