package com.bcxin.flink.cdc.kafka.source.task.jobs;

import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.event.job.core.domain.dtos.RedisConfig;
import com.bcxin.flink.cdc.kafka.source.task.JobContext;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogCdcValue;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogOffsetValue;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.CdcSourceMeta;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.interceptors.CustomProducerInterceptor;
import com.bcxin.flink.cdc.kafka.source.task.compnents.JdbcDbReaderComponent;
import com.bcxin.flink.cdc.kafka.source.task.proerpties.CdcDatabaseSourceProperty;
import com.bcxin.flink.cdc.kafka.source.task.utils.RecordHeadUtil;
import com.bcxin.flink.streaming.cores.utils.KafkaUtils;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bcxin/flink/cdc/kafka/source/task/jobs/DbStreamCdcKafkaJob.class */
public class DbStreamCdcKafkaJob extends DbStreamCdcJobAbstract {
    private static final Logger logger = LoggerFactory.getLogger(DbStreamCdcKafkaJob.class);
    private final JdbcDbReaderComponent jdbcDbReaderComponent;

    public DbStreamCdcKafkaJob(JdbcDbReaderComponent jdbcDbReaderComponent) {
        this.jdbcDbReaderComponent = jdbcDbReaderComponent;
    }

    @Override // com.bcxin.flink.cdc.kafka.source.task.jobs.DbStreamCdcJobAbstract
    protected void executeCoreAction(StreamExecutionEnvironment streamExecutionEnvironment, SingleOutputStreamOperator<BinlogCdcValue> singleOutputStreamOperator, BinlogOffsetValue binlogOffsetValue) {
        executeTargetKafkaSink(singleOutputStreamOperator);
    }

    private void executeTargetKafkaSink(SingleOutputStreamOperator<BinlogCdcValue> singleOutputStreamOperator) {
        final JobContext jobContext = JobContext.getInstance();
        final Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", jobContext.getKafkaConfigProperty().getBootstrapServer());
        if (!jobContext.isSkipBinlogRedisCalculated()) {
            RedisConfig defaultFromMainThread = RedisConfig.getDefaultFromMainThread();
            properties.setProperty(CustomProducerInterceptor.REDIS_CONFIG_KEY_HOST, defaultFromMainThread.getHost());
            properties.setProperty(CustomProducerInterceptor.REDIS_CONFIG_KEY_PORT, String.valueOf(defaultFromMainThread.getPort()));
            properties.setProperty(CustomProducerInterceptor.REDIS_CONFIG_KEY_PASSWORD, defaultFromMainThread.getPassword());
            properties.setProperty("interceptor.classes", CustomProducerInterceptor.class.getName());
        }
        properties.setProperty("client.id", String.format("%s_%s", jobContext.getKafkaConfigProperty().getConsumerGroupId(), UUID.randomUUID()));
        properties.setProperty("group.id", jobContext.getKafkaConfigProperty().getConsumerGroupId());
        KafkaSink build = KafkaSink.builder().setKafkaProducerConfig(properties).setRecordSerializer(new KafkaRecordSerializationSchema<BinlogCdcValue>() { // from class: com.bcxin.flink.cdc.kafka.source.task.jobs.DbStreamCdcKafkaJob.1
            private volatile Map<String, String> topicMap;

            public void open(SerializationSchema.InitializationContext initializationContext, KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext) throws Exception {
                super.open(initializationContext, kafkaSinkContext);
                this.topicMap = new HashMap();
            }

            public ProducerRecord<byte[], byte[]> serialize(BinlogCdcValue binlogCdcValue, KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext, Long l) {
                JsonProviderImpl jsonProviderImpl = new JsonProviderImpl();
                String value = binlogCdcValue.getValue();
                CdcSourceMeta cdcSourceMeta = (CdcSourceMeta) jsonProviderImpl.toObject(CdcSourceMeta.class, value);
                if (cdcSourceMeta == null || cdcSourceMeta.getDbName() == null) {
                    DbStreamCdcKafkaJob.logger.error("当前无效数据:{}", value);
                    throw new IllegalArgumentException(value);
                }
                String actualTopicName = jobContext.getKafkaConfigProperty().getActualTopicName(cdcSourceMeta.getDbName(), cdcSourceMeta.getTableName());
                String id = cdcSourceMeta.getId();
                if (StringUtils.isEmpty(id)) {
                    id = value;
                }
                Headers extractHeaders = RecordHeadUtil.extractHeaders(cdcSourceMeta, DbStreamCdcKafkaJob.this.jdbcDbReaderComponent);
                if (actualTopicName.contains("single-partition")) {
                    return new ProducerRecord<>(actualTopicName, 0, id.getBytes(), value.getBytes(), extractHeaders);
                }
                if (!this.topicMap.containsKey(actualTopicName)) {
                    KafkaUtils.ensureTopic(actualTopicName, properties);
                    this.topicMap.put(actualTopicName, Instant.now().toString());
                }
                return new ProducerRecord<>(actualTopicName, Integer.valueOf(cdcSourceMeta.getPartition()), id.getBytes(), value.getBytes(), extractHeaders);
            }
        }).build();
        CdcDatabaseSourceProperty databaseProperty = jobContext.getDatabaseProperty();
        singleOutputStreamOperator.sinkTo(build).uid(String.format("cdc-sink-%s-%s", databaseProperty.getServerId(), databaseProperty.getJobId())).setParallelism(4);
    }

    @Override // com.bcxin.flink.cdc.kafka.source.task.jobs.DbStreamCdcJobAbstract
    protected String getJobPrefixTitle() {
        return "2kafka";
    }
}
