/*
 * Decompiled with CFR 0.152.
 */
package com.bcxin.flink.cdc.kafka.source.task.cdcs.dynamic;

import com.bcxin.event.core.JsonProvider;
import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.flink.cdc.kafka.source.task.FlinkConstants;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.WriteModel;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
import org.apache.flink.connector.mongodb.sink.writer.MongoWriter;
import org.apache.flink.connector.mongodb.sink.writer.context.DefaultMongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.bson.BsonDocument;
import org.bson.BsonNull;
import org.bson.BsonNumber;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamicDbCollectionMongoWriter<IN>
implements SinkWriter<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoWriter.class);
    private final MongoConnectionOptions connectionOptions;
    private final MongoWriteOptions writeOptions;
    private final MongoSerializationSchema<IN> serializationSchema;
    private final MongoSinkContext sinkContext;
    private final MailboxExecutor mailboxExecutor;
    private final boolean flushOnCheckpoint;
    private final List<WriteModel<BsonDocument>> bulkRequests = new ArrayList<WriteModel<BsonDocument>>();
    private final Collector<WriteModel<BsonDocument>> collector;
    private final Counter numRecordsOut;
    private final MongoClient mongoClient;
    private boolean checkpointInProgress = false;
    private volatile long lastSendTime = 0L;
    private volatile long ackTime = Long.MAX_VALUE;
    private final JsonProvider jsonProvider;

    public DynamicDbCollectionMongoWriter(MongoConnectionOptions connectionOptions, MongoWriteOptions writeOptions, boolean flushOnCheckpoint, Sink.InitContext initContext, MongoSerializationSchema<IN> serializationSchema) {
        this.connectionOptions = (MongoConnectionOptions)Preconditions.checkNotNull((Object)connectionOptions);
        this.writeOptions = (MongoWriteOptions)Preconditions.checkNotNull((Object)writeOptions);
        this.serializationSchema = (MongoSerializationSchema)Preconditions.checkNotNull(serializationSchema);
        this.flushOnCheckpoint = flushOnCheckpoint;
        Preconditions.checkNotNull((Object)initContext);
        this.mailboxExecutor = (MailboxExecutor)Preconditions.checkNotNull((Object)initContext.getMailboxExecutor());
        SinkWriterMetricGroup metricGroup = (SinkWriterMetricGroup)Preconditions.checkNotNull((Object)initContext.metricGroup());
        metricGroup.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTime);
        this.numRecordsOut = metricGroup.getNumRecordsSendCounter();
        this.collector = new ListCollector(this.bulkRequests);
        this.sinkContext = new DefaultMongoSinkContext(initContext, writeOptions);
        try {
            SerializationSchema.InitializationContext initializationContext = initContext.asSerializationSchemaInitializationContext();
            serializationSchema.open(initializationContext, this.sinkContext, writeOptions);
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Failed to open the MongoEmitter", (Throwable)e);
        }
        this.mongoClient = MongoClients.create((String)connectionOptions.getUri());
        this.jsonProvider = new JsonProviderImpl();
    }

    public void write(IN element, SinkWriter.Context context) throws IOException, InterruptedException {
        while (this.checkpointInProgress) {
            this.mailboxExecutor.yield();
        }
        WriteModel writeModel = this.serializationSchema.serialize(element, this.sinkContext);
        this.numRecordsOut.inc();
        this.collector.collect((Object)writeModel);
        if (this.isOverMaxBatchSizeLimit() || this.isOverMaxBatchIntervalLimit()) {
            this.doBulkWrite();
        }
    }

    public void flush(boolean endOfInput) throws IOException {
        this.checkpointInProgress = true;
        while (!this.bulkRequests.isEmpty() && (this.flushOnCheckpoint || endOfInput)) {
            this.doBulkWrite();
        }
        this.checkpointInProgress = false;
    }

    public void close() {
        this.mongoClient.close();
    }

    @VisibleForTesting
    void doBulkWrite() throws IOException {
        if (this.bulkRequests.isEmpty()) {
            return;
        }
        int maxRetries = this.writeOptions.getMaxRetries();
        long retryIntervalMs = this.writeOptions.getRetryIntervalMs();
        for (int i = 0; i <= maxRetries; ++i) {
            try {
                this.lastSendTime = System.currentTimeMillis();
                this.executeBulkRequests(this.bulkRequests);
                this.ackTime = System.currentTimeMillis();
                this.bulkRequests.clear();
                break;
            }
            catch (MongoException e) {
                LOG.debug("Bulk Write to MongoDB failed, retry times = {}", (Object)i, (Object)e);
                if (i >= maxRetries) {
                    LOG.error("Bulk Write to MongoDB failed", (Throwable)e);
                    throw new IOException(e);
                }
                try {
                    Thread.sleep(retryIntervalMs * (long)(i + 1));
                    continue;
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Unable to flush; interrupted while doing another attempt", e);
                }
            }
        }
    }

    private boolean isOverMaxBatchSizeLimit() {
        int bulkActions = this.writeOptions.getBatchSize();
        return bulkActions != -1 && this.bulkRequests.size() >= bulkActions;
    }

    private boolean isOverMaxBatchIntervalLimit() {
        long bulkFlushInterval = this.writeOptions.getBatchIntervalMs();
        long lastSentInterval = System.currentTimeMillis() - this.lastSendTime;
        return bulkFlushInterval != -1L && lastSentInterval >= bulkFlushInterval;
    }

    private void executeBulkRequests(List<WriteModel<BsonDocument>> bulkRequests) {
        HashMap groups = new HashMap();
        for (WriteModel<BsonDocument> documentWriteModel : bulkRequests) {
            String table;
            InsertOneModel insertOneModel = (InsertOneModel)documentWriteModel;
            BsonDocument document = (BsonDocument)insertOneModel.getDocument();
            BsonDocument source = document.getDocument((Object)"source");
            String db = source.get((Object)"db").asString().getValue();
            String groupKey = String.format("%s|%s", db, table = source.get((Object)"table").asString().getValue());
            ArrayList<ReplaceOneModel> collection = (ArrayList<ReplaceOneModel>)groups.get(groupKey);
            if (collection == null) {
                collection = new ArrayList<ReplaceOneModel>();
                groups.put(groupKey, collection);
            }
            BsonValue beforeDataNode = document.get((Object)"before");
            BsonValue afterDataNode = document.get((Object)"after");
            BsonValue dataNode = afterDataNode;
            if (dataNode == null || dataNode instanceof BsonNull) {
                dataNode = beforeDataNode;
            }
            BsonDocument contentDocument = null;
            if (dataNode != null) {
                contentDocument = dataNode.asDocument();
                BsonDocument dataBodyNode = (BsonDocument)dataNode;
                Optional<String> keyOptional = dataBodyNode.keySet().stream().filter(ix -> ix.equalsIgnoreCase("id") || ix.equalsIgnoreCase("pk_id") || ix.equalsIgnoreCase("pkId")).findFirst();
                if (!keyOptional.isPresent()) {
                    throw new BadEventException(String.format("\u65e0\u6548\u6570\u636e; \u627e\u4e0d\u5230\u8282\u70b9\u4e3bId=%s", this.jsonProvider.getJson((Object)document)));
                }
                BsonValue keyValue = dataBodyNode.get((Object)keyOptional.get());
                Collection dateTimeKeys = contentDocument.keySet().stream().filter(ix -> FlinkConstants.isDateTimeField(ix)).collect(Collectors.toList());
                for (String key : dateTimeKeys) {
                    BsonValue bsonKeyValue = dataBodyNode.get((Object)key);
                    if (bsonKeyValue instanceof BsonNumber) {
                        Object value = FlinkConstants.formatValue(key, bsonKeyValue.asNumber());
                        if (value instanceof BsonValue) continue;
                        contentDocument.put(key, (BsonValue)new BsonString(String.valueOf(value)));
                        continue;
                    }
                    if (bsonKeyValue != null && !(bsonKeyValue instanceof BsonNull)) continue;
                    contentDocument.put(key, bsonKeyValue);
                }
                contentDocument.put("_id", keyValue);
                contentDocument.put("__meta.source", (BsonValue)source);
                contentDocument.put("__meta.op", document.get((Object)"op"));
                contentDocument.put("__meta.ts_ms", document.get((Object)"ts_ms"));
                if (beforeDataNode != dataNode) {
                    contentDocument.put("__meta.before", beforeDataNode);
                }
                contentDocument.put("__meta.sync_time", (BsonValue)new BsonString(new Date().toString()));
                Bson bsonIdFilter = Filters.eq((String)"_id", (Object)keyValue);
                ReplaceOneModel replaceOneDocument = new ReplaceOneModel(bsonIdFilter, (Object)contentDocument);
                collection.add(replaceOneDocument);
                continue;
            }
            LOG.error("\u65e0\u6548\u6570\u636e; \u627e\u4e0d\u5230\u8282\u70b9\u6570\u636e:{}", (Object)document);
        }
        for (String dbTableKey : groups.keySet()) {
            String[] dbTableArray = dbTableKey.split("\\|");
            String db = dbTableArray[0].toLowerCase();
            String table = dbTableArray[1].toLowerCase();
            List requests = (List)groups.get(dbTableKey);
            MongoCollection mongoCollection = this.mongoClient.getDatabase(db).getCollection(table, BsonDocument.class);
            mongoCollection.bulkWrite(requests);
            LOG.error("\u6570\u636e\u540c\u6b65\u5b8c\u6bd5:{}.{};size={}", new Object[]{db, table, requests.size()});
        }
    }
}

