/*
 * Decompiled with CFR 0.152.
 */
package com.bcxin.tenant.data.etc.tasks.jobs;

import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.flink.streaming.cores.properties.CheckpointConfigProperty;
import com.bcxin.tenant.data.etc.tasks.components.BinlogRawValue;
import com.bcxin.tenant.data.etc.tasks.jobs.DataJobAbstract;
import com.bcxin.tenant.data.etc.tasks.properties.DataEtcConfigProperty;
import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Date;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataEtcJob
extends DataJobAbstract {
    private static final Logger logger = LoggerFactory.getLogger(DataEtcJob.class);
    private final Collection<DataEtcConfigProperty> configProperties;
    private final String primaryKeyName = "id";
    private final String env;
    private final String configFile;
    private final boolean isDebug;
    private final CheckpointConfigProperty checkpointConfigProperty;

    public DataEtcJob(CheckpointConfigProperty checkpointConfigProperty, Collection<DataEtcConfigProperty> configProperties, String env, String configFile, boolean isDebug) {
        super(configProperties, configFile, isDebug);
        this.checkpointConfigProperty = checkpointConfigProperty;
        this.configProperties = configProperties;
        this.env = env;
        this.configFile = configFile;
        this.isDebug = isDebug;
    }

    @Override
    protected void coreExecute() throws Exception {
        super.coreExecute();
        StreamExecutionEnvironment env = this.getStreamExecutionEnvironment(this.checkpointConfigProperty);
        KafkaRecordDeserializationSchema<BinlogRawValue> kafkaRecordDeserializationSchema = this.getDeserializationSchema();
        JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder().withBatchSize(500).withBatchIntervalMs(60000L).withMaxRetries(3).build();
        for (DataEtcConfigProperty config : this.configProperties) {
            this.buildConfigSourceAndSink(env, kafkaRecordDeserializationSchema, jdbcExecutionOptions, config);
        }
        String title = this.configProperties.stream().map(ix -> ix.getTitle()).distinct().collect(Collectors.joining(";"));
        env.registerJobListener(new JobListener(){

            public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
                if (jobClient != null) {
                    logger.error("Job\u4ee5\u53ca\u63d0\u4ea4\u5230\u670d\u52a1\u7aef: jobId={};status={};", (Object)jobClient.getJobID(), (Object)jobClient.getJobStatus());
                }
            }

            public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
                if (jobExecutionResult != null) {
                    logger.error("Job\u4ee5\u53ca\u63d0\u4ea4\u5230\u670d\u52a1\u7aef: jobId={};JobExecutionResult={};", (Object)jobExecutionResult.getJobID(), (Object)jobExecutionResult.isJobExecutionResult());
                }
            }
        });
        env.execute(String.format("job-v18.3:%s-\u73af\u5883-%s-%s", this.env, title, this.configFile));
    }

    private void buildConfigSourceAndSink(StreamExecutionEnvironment env, KafkaRecordDeserializationSchema<BinlogRawValue> kafkaRecordDeserializationSchema, JdbcExecutionOptions jdbcExecutionOptions, DataEtcConfigProperty config) {
        if (config == null || CollectionUtils.isEmpty(config.getTopicSubscribers())) {
            throw new BadEventException("\u65e0\u6548KafkaTopic\u914d\u7f6e\u6570\u636e\u4fe1\u606f");
        }
        Collection<DataEtcConfigProperty.TmpMergedKafkaConnectionTopicInfo> kafkaConnectionTopicInfos = this.getMergedKafkaConnections(config);
        kafkaConnectionTopicInfos.forEach(kti -> {
            Collection selectedTopicSubscriberConfigs = config.getTopicSubscribers().stream().filter(ix -> ix.getRefKafkaName().equalsIgnoreCase(kti.getKafkaConnection().getName()) && kti.getTopics().stream().anyMatch(ii -> ii.equalsIgnoreCase(ix.getTopic()))).collect(Collectors.toList());
            if (CollectionUtils.isEmpty((Collection)selectedTopicSubscriberConfigs)) {
                throw new BadEventException("\u65e0\u6548\u4e3b\u9898\u914d\u7f6e");
            }
            DataEtcConfigProperty.TopicSubscriberConfigProperty selectedSubscriberConfig = (DataEtcConfigProperty.TopicSubscriberConfigProperty)selectedTopicSubscriberConfigs.stream().findFirst().get();
            if (selectedTopicSubscriberConfigs.size() > 1) {
                throw new BadEventException(String.format("\u540c\u4e00\u4e2a\u4e3b\u9898(%s)\u5728\u4e00\u4e2a\u6587\u4ef6\u4e2d\u5fc5\u987b\u53ea\u80fd\u53ea\u6709\u4e00\u4e2a\u4e3b\u9898\u8ba2\u9605", selectedSubscriberConfig.getTopic()));
            }
            String[] selectedTopics = kti.getTopics().toArray(new String[kti.getTopics().size()]);
            String groupId = String.format("g_%s_%s", config.getGroupId(), selectedSubscriberConfig.getUid());
            String clientId = String.format("cli_%s_%s", config.getGroupId(), selectedSubscriberConfig.getUid());
            String sourceUid = String.format("uid_%s_%s", this.configFile, groupId);
            int receiveBufferConfig = 5120;
            if ("others".equalsIgnoreCase(this.configFile)) {
                receiveBufferConfig = 512;
            }
            logger.error("init with configFile={} receiveBufferConfig={};", (Object)this.configFile, (Object)receiveBufferConfig);
            KafkaSource binLogKafkaSource = KafkaSource.builder().setBootstrapServers(kti.getKafkaConnection().getBootstrapServer()).setStartingOffsets(OffsetsInitializer.committedOffsets((OffsetResetStrategy)OffsetResetStrategy.LATEST)).setGroupId(groupId).setClientIdPrefix(clientId).setTopics(selectedTopics).setDeserializer(kafkaRecordDeserializationSchema).setProperty("session.timeout.ms", "120000").setProperty("max.poll.interval.ms", "300000").setProperty("enable.auto.commit", "false").setProperty("fetch.max.bytes", String.valueOf(0x1400000)).setProperty("receive.buffer.bytes", String.valueOf(receiveBufferConfig)).setProperty("commit.offsets.on.checkpoint", "false").build();
            WatermarkStrategy watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness((Duration)Duration.of(5L, ChronoUnit.MINUTES)).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(event, timestamp) -> {
                BinlogRawValue rawValue = (BinlogRawValue)event;
                Date lastSyncVersion = rawValue.getLastSyncVersion();
                return lastSyncVersion.getTime();
            }).withIdleness(Duration.ofSeconds(10L));
            SingleOutputStreamOperator debeziumJsonNodeDtoKeyedStream = env.fromSource((Source)binLogKafkaSource, watermarkStrategy, String.format("\u8ba2\u9605v2-%s", selectedSubscriberConfig.getTopic())).setParallelism(5).uid(sourceUid);
            this.buildJdbcSubscriber(kti.getKafkaConnection().getBootstrapServer(), (SingleOutputStreamOperator<BinlogRawValue>)debeziumJsonNodeDtoKeyedStream, jdbcExecutionOptions, config, selectedSubscriberConfig.getSubscriberContents(), sourceUid);
        });
    }
}

