package com.bcxin.tenant.data.etc.tasks.jobs;

import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.flink.streaming.cores.properties.CheckpointConfigProperty;
import com.bcxin.tenant.data.etc.tasks.components.BinlogRawValue;
import com.bcxin.tenant.data.etc.tasks.properties.DataEtcConfigProperty;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Iterator;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bcxin/tenant/data/etc/tasks/jobs/DataEtcJob.class */
public class DataEtcJob extends DataJobAbstract {
    private static final Logger logger = LoggerFactory.getLogger(DataEtcJob.class);
    private final Collection<DataEtcConfigProperty> configProperties;
    private final String primaryKeyName = "id";
    private final String env;
    private final String configFile;
    private final boolean isDebug;
    private final CheckpointConfigProperty checkpointConfigProperty;

    public DataEtcJob(CheckpointConfigProperty checkpointConfigProperty, Collection<DataEtcConfigProperty> collection, String str, String str2, boolean z) {
        super(collection, str2, z);
        this.primaryKeyName = BinlogRawValue.FIELD_ID;
        this.checkpointConfigProperty = checkpointConfigProperty;
        this.configProperties = collection;
        this.env = str;
        this.configFile = str2;
        this.isDebug = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.bcxin.tenant.data.etc.tasks.jobs.DataJobAbstract
    public void coreExecute() throws Exception {
        super.coreExecute();
        StreamExecutionEnvironment streamExecutionEnvironment = getStreamExecutionEnvironment(this.checkpointConfigProperty);
        KafkaRecordDeserializationSchema<BinlogRawValue> deserializationSchema = getDeserializationSchema();
        JdbcExecutionOptions build = JdbcExecutionOptions.builder().withBatchSize(500).withBatchIntervalMs(60000L).withMaxRetries(3).build();
        Iterator<DataEtcConfigProperty> it = this.configProperties.iterator();
        while (it.hasNext()) {
            buildConfigSourceAndSink(streamExecutionEnvironment, deserializationSchema, build, it.next());
        }
        String str = (String) this.configProperties.stream().map(dataEtcConfigProperty -> {
            return dataEtcConfigProperty.getTitle();
        }).distinct().collect(Collectors.joining(";"));
        streamExecutionEnvironment.registerJobListener(new JobListener() { // from class: com.bcxin.tenant.data.etc.tasks.jobs.DataEtcJob.1
            public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable th) {
                if (jobClient != null) {
                    DataEtcJob.logger.error("Job以及提交到服务端: jobId={};status={};", jobClient.getJobID(), jobClient.getJobStatus());
                }
            }

            public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable th) {
                if (jobExecutionResult != null) {
                    DataEtcJob.logger.error("Job以及提交到服务端: jobId={};JobExecutionResult={};", jobExecutionResult.getJobID(), Boolean.valueOf(jobExecutionResult.isJobExecutionResult()));
                }
            }
        });
        streamExecutionEnvironment.execute(String.format("job-v18.3:%s-环境-%s-%s", this.env, str, this.configFile));
    }

    private void buildConfigSourceAndSink(StreamExecutionEnvironment streamExecutionEnvironment, KafkaRecordDeserializationSchema<BinlogRawValue> kafkaRecordDeserializationSchema, JdbcExecutionOptions jdbcExecutionOptions, DataEtcConfigProperty dataEtcConfigProperty) {
        if (dataEtcConfigProperty == null || CollectionUtils.isEmpty(dataEtcConfigProperty.getTopicSubscribers())) {
            throw new BadEventException("无效KafkaTopic配置数据信息");
        }
        getMergedKafkaConnections(dataEtcConfigProperty).forEach(tmpMergedKafkaConnectionTopicInfo -> {
            Collection collection = (Collection) dataEtcConfigProperty.getTopicSubscribers().stream().filter(topicSubscriberConfigProperty -> {
                return topicSubscriberConfigProperty.getRefKafkaName().equalsIgnoreCase(tmpMergedKafkaConnectionTopicInfo.getKafkaConnection().getName()) && tmpMergedKafkaConnectionTopicInfo.getTopics().stream().anyMatch(str -> {
                    return str.equalsIgnoreCase(topicSubscriberConfigProperty.getTopic());
                });
            }).collect(Collectors.toList());
            if (CollectionUtils.isEmpty(collection)) {
                throw new BadEventException("无效主题配置");
            }
            DataEtcConfigProperty.TopicSubscriberConfigProperty topicSubscriberConfigProperty2 = (DataEtcConfigProperty.TopicSubscriberConfigProperty) collection.stream().findFirst().get();
            if (collection.size() > 1) {
                throw new BadEventException(String.format("同一个主题(%s)在一个文件中必须只能只有一个主题订阅", topicSubscriberConfigProperty2.getTopic()));
            }
            String[] strArr = (String[]) tmpMergedKafkaConnectionTopicInfo.getTopics().toArray(new String[tmpMergedKafkaConnectionTopicInfo.getTopics().size()]);
            String format = String.format("g_%s_%s", dataEtcConfigProperty.getGroupId(), topicSubscriberConfigProperty2.getUid());
            String format2 = String.format("cli_%s_%s", dataEtcConfigProperty.getGroupId(), topicSubscriberConfigProperty2.getUid());
            String format3 = String.format("uid_%s_%s", this.configFile, format);
            int i = 5120;
            if ("others".equalsIgnoreCase(this.configFile)) {
                i = 512;
            }
            logger.error("init with configFile={} receiveBufferConfig={};", this.configFile, Integer.valueOf(i));
            buildJdbcSubscriber(tmpMergedKafkaConnectionTopicInfo.getKafkaConnection().getBootstrapServer(), streamExecutionEnvironment.fromSource(KafkaSource.builder().setBootstrapServers(tmpMergedKafkaConnectionTopicInfo.getKafkaConnection().getBootstrapServer()).setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setGroupId(format).setClientIdPrefix(format2).setTopics(strArr).setDeserializer(kafkaRecordDeserializationSchema).setProperty("session.timeout.ms", "120000").setProperty("max.poll.interval.ms", "300000").setProperty("enable.auto.commit", "false").setProperty("fetch.max.bytes", String.valueOf(20971520)).setProperty("receive.buffer.bytes", String.valueOf(i)).setProperty("commit.offsets.on.checkpoint", "false").build(), WatermarkStrategy.forBoundedOutOfOrderness(Duration.of(5L, ChronoUnit.MINUTES)).withTimestampAssigner((obj, j) -> {
                return ((BinlogRawValue) obj).getLastSyncVersion().getTime();
            }).withIdleness(Duration.ofSeconds(10L)), String.format("订阅v2-%s", topicSubscriberConfigProperty2.getTopic())).setParallelism(5).uid(format3), jdbcExecutionOptions, dataEtcConfigProperty, topicSubscriberConfigProperty2.getSubscriberContents(), format3);
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 452367369:
                if (implMethodName.equals("lambda$buildConfigSourceAndSink$e11eaac7$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("com/bcxin/tenant/data/etc/tasks/jobs/DataEtcJob") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;J)J")) {
                    return (obj, j) -> {
                        return ((BinlogRawValue) obj).getLastSyncVersion().getTime();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
