/*
 * Decompiled with CFR 0.152.
 */
package com.bcxin.flink.cdc.kafka.source.task.jobs;

import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.event.job.core.domain.dtos.RedisConfig;
import com.bcxin.flink.cdc.kafka.source.task.JobContext;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogCdcValue;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogOffsetValue;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.CdcSourceMeta;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.interceptors.CustomProducerInterceptor;
import com.bcxin.flink.cdc.kafka.source.task.compnents.JdbcDbReaderComponent;
import com.bcxin.flink.cdc.kafka.source.task.jobs.DbStreamCdcJobAbstract;
import com.bcxin.flink.cdc.kafka.source.task.proerpties.CdcDatabaseSourceProperty;
import com.bcxin.flink.cdc.kafka.source.task.utils.RecordHeadUtil;
import com.bcxin.flink.streaming.cores.utils.KafkaUtils;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DbStreamCdcKafkaJob
extends DbStreamCdcJobAbstract {
    private static final Logger logger = LoggerFactory.getLogger(DbStreamCdcKafkaJob.class);
    private final JdbcDbReaderComponent jdbcDbReaderComponent;

    public DbStreamCdcKafkaJob(JdbcDbReaderComponent jdbcDbReaderComponent) {
        this.jdbcDbReaderComponent = jdbcDbReaderComponent;
    }

    @Override
    protected void executeCoreAction(StreamExecutionEnvironment env, SingleOutputStreamOperator<BinlogCdcValue> dataStreamSource, BinlogOffsetValue binlogOffsetValue) {
        this.executeTargetKafkaSink(dataStreamSource);
    }

    private void executeTargetKafkaSink(SingleOutputStreamOperator<BinlogCdcValue> sourceStream) {
        final JobContext jobContext = JobContext.getInstance();
        final Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", jobContext.getKafkaConfigProperty().getBootstrapServer());
        if (!jobContext.isSkipBinlogRedisCalculated()) {
            RedisConfig redisConfig = RedisConfig.getDefaultFromMainThread();
            properties.setProperty("REDIS_CONFIG_KEY_HOST", redisConfig.getHost());
            properties.setProperty("REDIS_CONFIG_KEY_PORT", String.valueOf(redisConfig.getPort()));
            properties.setProperty("REDIS_CONFIG_KEY_PASSWORD", redisConfig.getPassword());
            properties.setProperty("interceptor.classes", CustomProducerInterceptor.class.getName());
        }
        properties.setProperty("client.id", String.format("%s_%s", jobContext.getKafkaConfigProperty().getConsumerGroupId(), UUID.randomUUID()));
        properties.setProperty("group.id", jobContext.getKafkaConfigProperty().getConsumerGroupId());
        KafkaSinkBuilder kafkaSinkBuilder = KafkaSink.builder().setKafkaProducerConfig(properties);
        KafkaSink kafkaSink = kafkaSinkBuilder.setRecordSerializer((KafkaRecordSerializationSchema)new KafkaRecordSerializationSchema<BinlogCdcValue>(){
            private volatile Map<String, String> topicMap;

            public void open(SerializationSchema.InitializationContext context, KafkaRecordSerializationSchema.KafkaSinkContext sinkContext) throws Exception {
                super.open(context, sinkContext);
                this.topicMap = new HashMap<String, String>();
            }

            public ProducerRecord<byte[], byte[]> serialize(BinlogCdcValue cdcValue, KafkaRecordSerializationSchema.KafkaSinkContext context, Long timestamp) {
                JsonProviderImpl jsonProvider = new JsonProviderImpl();
                String element = cdcValue.getValue();
                CdcSourceMeta schema = (CdcSourceMeta)jsonProvider.toObject(CdcSourceMeta.class, element);
                if (schema == null || schema.getDbName() == null) {
                    logger.error("\u5f53\u524d\u65e0\u6548\u6570\u636e:{}", (Object)element);
                    throw new IllegalArgumentException(element);
                }
                String topicKey = jobContext.getKafkaConfigProperty().getActualTopicName(schema.getDbName(), schema.getTableName());
                String key = schema.getId();
                if (StringUtils.isEmpty((CharSequence)key)) {
                    key = element;
                }
                Headers headers = RecordHeadUtil.extractHeaders(schema, DbStreamCdcKafkaJob.this.jdbcDbReaderComponent);
                if (!topicKey.contains("single-partition")) {
                    if (!this.topicMap.containsKey(topicKey)) {
                        KafkaUtils.ensureTopic((String)topicKey, (Properties)properties);
                        this.topicMap.put(topicKey, Instant.now().toString());
                    }
                    return new ProducerRecord(topicKey, Integer.valueOf(schema.getPartition()), (Object)key.getBytes(), (Object)element.getBytes(), (Iterable)headers);
                }
                return new ProducerRecord(topicKey, Integer.valueOf(0), (Object)key.getBytes(), (Object)element.getBytes(), (Iterable)headers);
            }
        }).build();
        CdcDatabaseSourceProperty databaseProperty = jobContext.getDatabaseProperty();
        sourceStream.sinkTo((Sink)kafkaSink).uid(String.format("cdc-sink-%s-%s", databaseProperty.getServerId(), databaseProperty.getJobId())).setParallelism(4);
    }

    @Override
    protected String getJobPrefixTitle() {
        return "2kafka";
    }
}

