/*
 * Decompiled with CFR 0.152.
 */
package com.bcxin.tenant.data.etc.tasks.components;

import com.bcxin.event.core.JsonProvider;
import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.tenant.data.etc.tasks.components.BinlogRawValue;
import com.bcxin.tenant.data.etc.tasks.properties.DataEtcConfigProperty;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

public class CustomRedisJdbcSinkRichOutFormat
extends RichOutputFormat<BinlogRawValue>
implements Serializable {
    private final Queue<BinlogRawValue> batch = new LinkedBlockingQueue<BinlogRawValue>();
    private final DataEtcConfigProperty.RedisConnectionConfigProperty configProperty;
    private final DataEtcConfigProperty.JdbcSubscriberContentConfigProperty subscriberContentConfigProperty;
    private final JdbcExecutionOptions executionOptions;
    private transient ScheduledExecutorService scheduler;
    private transient ScheduledFuture<?> scheduledFuture;
    private transient Exception flushException;
    private final JsonProvider jsonProvider;

    public CustomRedisJdbcSinkRichOutFormat(JdbcExecutionOptions executionOptions, DataEtcConfigProperty.RedisConnectionConfigProperty configProperty, DataEtcConfigProperty.JdbcSubscriberContentConfigProperty subscriberContentConfigProperty) {
        this.configProperty = configProperty;
        this.executionOptions = executionOptions;
        this.subscriberContentConfigProperty = subscriberContentConfigProperty;
        this.jsonProvider = new JsonProviderImpl();
    }

    public void configure(Configuration parameters) {
    }

    public void open(int taskNumber, int numTasks) throws IOException {
        this.scheduler = Executors.newScheduledThreadPool(1, (ThreadFactory)new ExecutorThreadFactory("jdbc-upsert-output-format"));
        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
            CustomRedisJdbcSinkRichOutFormat customRedisJdbcSinkRichOutFormat = this;
            synchronized (customRedisJdbcSinkRichOutFormat) {
                try {
                    this.flush();
                }
                catch (Exception e) {
                    this.flushException = e;
                }
            }
        }, this.executionOptions.getBatchIntervalMs(), this.executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
    }

    public void writeRecord(BinlogRawValue record) throws IOException {
        this.batch.add(record);
        if (this.batch.size() >= this.executionOptions.getBatchSize()) {
            this.flush();
            this.batch.clear();
        }
    }

    public void close() throws IOException {
    }

    private void flush() {
        ArrayList<BinlogRawValue> data = new ArrayList<BinlogRawValue>();
        while (this.batch.size() > 0) {
            data.add(this.batch.poll());
        }
    }
}

