/*
 * Decompiled with CFR 0.152.
 */
package com.bcxin.tenant.data.etc.tasks.components;

import com.bcxin.flink.streaming.cores.utils.KafkaUtils;
import com.bcxin.tenant.data.etc.tasks.dtos.DtqRecordDto;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaOutputFormat
extends RichOutputFormat<DtqRecordDto> {
    private final Queue<DtqRecordDto> batchQueue = new LinkedList<DtqRecordDto>();
    private final Map<String, Object> kafkaPropertyConfig;
    private final int batchIntervalMs;
    private final int batchSize;
    private final String topic;
    private transient KafkaProducer<String, String> kafkaProducer;
    private transient ScheduledFuture<?> scheduledFuture;
    private final String bootstrapServer;

    public KafkaOutputFormat(String bootstrapServer, String topic, int batchSize, int batchIntervalMs) {
        this.batchSize = batchSize;
        this.batchIntervalMs = batchIntervalMs;
        this.bootstrapServer = bootstrapServer;
        this.kafkaPropertyConfig = new HashMap<String, Object>();
        this.kafkaPropertyConfig.put("bootstrap.servers", bootstrapServer);
        this.kafkaPropertyConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.kafkaPropertyConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.topic = topic;
    }

    public void configure(Configuration parameters) {
    }

    public void open(int taskNumber, int numTasks) throws IOException {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, (ThreadFactory)new ExecutorThreadFactory("kafka-dlq-output-format"));
        this.scheduledFuture = scheduler.scheduleWithFixedDelay(() -> {
            Class<KafkaOutputFormat> clazz = KafkaOutputFormat.class;
            synchronized (KafkaOutputFormat.class) {
                this.flush();
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return;
            }
        }, this.batchIntervalMs, this.batchIntervalMs, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void writeRecord(DtqRecordDto record) throws IOException {
        this.batchQueue.add(record);
        if (this.batchQueue.size() < this.batchSize) return;
        Class<KafkaOutputFormat> clazz = KafkaOutputFormat.class;
        synchronized (KafkaOutputFormat.class) {
            this.flush();
            // ** MonitorExit[var2_2] (shouldn't be in output)
            return;
        }
    }

    public void close() throws IOException {
        if (this.kafkaProducer != null) {
            this.kafkaProducer.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    private void flush() {
        if (this.batchQueue.isEmpty()) return;
        Class<KafkaOutputFormat> clazz = KafkaOutputFormat.class;
        // MONITORENTER : com.bcxin.tenant.data.etc.tasks.components.KafkaOutputFormat.class
        while (true) {
            if (this.batchQueue.isEmpty()) {
                this.batchQueue.clear();
                // MONITOREXIT : clazz
                return;
            }
            if (this.kafkaProducer == null) {
                Class<KafkaUtils> clazz2 = KafkaUtils.class;
                // MONITORENTER : com.bcxin.flink.streaming.cores.utils.KafkaUtils.class
                if (this.kafkaProducer == null) {
                    this.kafkaProducer = new KafkaProducer(this.kafkaPropertyConfig);
                    Properties properties = new Properties();
                    properties.setProperty("bootstrap.servers", this.bootstrapServer);
                    properties.setProperty("client.id", String.format("admin-client-ensure-topic-%s", Thread.currentThread().getId()));
                    KafkaUtils.ensureTopic((String)this.topic, (Properties)properties);
                }
                // MONITOREXIT : clazz2
            }
            DtqRecordDto value = this.batchQueue.poll();
            String key = String.format("%s.%s#%s", value.getDbName(), value.getTableName(), value.getId());
            int partition = Math.abs(key.hashCode()) % 3;
            ProducerRecord record = new ProducerRecord(this.topic, Integer.valueOf(partition), (Object)key, (Object)new String(value.getValue()));
            this.kafkaProducer.send(record);
        }
    }
}

