package com.bcxin.tenant.data.etc.tasks.components;

import com.bcxin.flink.streaming.cores.utils.KafkaUtils;
import com.bcxin.tenant.data.etc.tasks.dtos.DtqRecordDto;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:com/bcxin/tenant/data/etc/tasks/components/KafkaOutputFormat.class */
public class KafkaOutputFormat extends RichOutputFormat<DtqRecordDto> {
    private final Queue<DtqRecordDto> batchQueue = new LinkedList();
    private final Map<String, Object> kafkaPropertyConfig = new HashMap();
    private final int batchIntervalMs;
    private final int batchSize;
    private final String topic;
    private transient KafkaProducer<String, String> kafkaProducer;
    private transient ScheduledFuture<?> scheduledFuture;
    private final String bootstrapServer;

    public KafkaOutputFormat(String str, String str2, int i, int i2) {
        this.batchSize = i;
        this.batchIntervalMs = i2;
        this.bootstrapServer = str;
        this.kafkaPropertyConfig.put("bootstrap.servers", str);
        this.kafkaPropertyConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.kafkaPropertyConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.topic = str2;
    }

    public void configure(Configuration configuration) {
    }

    public void open(int i, int i2) throws IOException {
        this.scheduledFuture = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("kafka-dlq-output-format")).scheduleWithFixedDelay(() -> {
            synchronized (KafkaOutputFormat.class) {
                flush();
            }
        }, this.batchIntervalMs, this.batchIntervalMs, TimeUnit.MILLISECONDS);
    }

    public void writeRecord(DtqRecordDto dtqRecordDto) throws IOException {
        this.batchQueue.add(dtqRecordDto);
        if (this.batchQueue.size() >= this.batchSize) {
            synchronized (KafkaOutputFormat.class) {
                flush();
            }
        }
    }

    public void close() throws IOException {
        if (this.kafkaProducer != null) {
            this.kafkaProducer.close();
        }
    }

    private void flush() {
        if (this.batchQueue.isEmpty()) {
            return;
        }
        synchronized (KafkaOutputFormat.class) {
            while (!this.batchQueue.isEmpty()) {
                if (this.kafkaProducer == null) {
                    synchronized (KafkaUtils.class) {
                        if (this.kafkaProducer == null) {
                            this.kafkaProducer = new KafkaProducer<>(this.kafkaPropertyConfig);
                            Properties properties = new Properties();
                            properties.setProperty("bootstrap.servers", this.bootstrapServer);
                            properties.setProperty("client.id", String.format("admin-client-ensure-topic-%s", Long.valueOf(Thread.currentThread().getId())));
                            KafkaUtils.ensureTopic(this.topic, properties);
                        }
                    }
                }
                DtqRecordDto poll = this.batchQueue.poll();
                String format = String.format("%s.%s#%s", poll.getDbName(), poll.getTableName(), poll.getId());
                this.kafkaProducer.send(new ProducerRecord(this.topic, Integer.valueOf(Math.abs(format.hashCode()) % 3), format, new String(poll.getValue())));
            }
            this.batchQueue.clear();
        }
    }
}
