/*
 * Decompiled with CFR 0.152.
 */
package com.bcxin.flink.cdc.kafka.source.task.compnents;

import com.alibaba.fastjson.JSONObject;
import com.bcxin.event.core.JsonProvider;
import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogCdcValue;
import com.bcxin.flink.streaming.cores.utils.DebeziumJsonNodeDtoUtils;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.ConverterType;

public class JsonBinlogMetaDebeziumDeserializationSchema
implements DebeziumDeserializationSchema<BinlogCdcValue> {
    private static final long serialVersionUID = 1L;
    private transient JsonConverter jsonConverter;
    private final Boolean includeSchema;
    private Map<String, Object> customConverterConfigs;
    private final JsonProvider jsonProvider;

    public JsonBinlogMetaDebeziumDeserializationSchema() {
        this(false);
    }

    public JsonBinlogMetaDebeziumDeserializationSchema(Boolean includeSchema) {
        this.includeSchema = includeSchema;
        this.jsonProvider = new JsonProviderImpl();
    }

    public JsonBinlogMetaDebeziumDeserializationSchema(Boolean includeSchema, Map<String, Object> customConverterConfigs) {
        this.includeSchema = includeSchema;
        this.customConverterConfigs = customConverterConfigs;
        this.jsonProvider = new JsonProviderImpl();
    }

    public void deserialize(SourceRecord record, Collector<BinlogCdcValue> out) throws Exception {
        if (this.jsonConverter == null) {
            this.initializeJsonConverter();
        }
        byte[] bytes = this.jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
        BinlogCdcValue cdcValue = this.translate2BinlogCdcValue(record, bytes);
        out.collect((Object)cdcValue);
    }

    private void initializeJsonConverter() {
        this.jsonConverter = new JsonConverter();
        HashMap<String, Object> configs = new HashMap<String, Object>(2);
        configs.put("converter.type", ConverterType.VALUE.getName());
        configs.put("schemas.enable", this.includeSchema);
        if (this.customConverterConfigs != null) {
            configs.putAll(this.customConverterConfigs);
        }
        this.jsonConverter.configure(configs);
    }

    public TypeInformation<BinlogCdcValue> getProducedType() {
        return BasicTypeInfo.of(BinlogCdcValue.class);
    }

    private BinlogCdcValue translate2BinlogCdcValue(SourceRecord record, byte[] value) {
        Optional<String> keyOptional;
        String ix = new String(value);
        JSONObject jsonObject = (JSONObject)this.jsonProvider.toObject(JSONObject.class, ix);
        if (jsonObject == null) {
            throw new BadEventException(String.format("\u65e0\u6548\u7684\u6570\u636e:%s", ix));
        }
        JSONObject sourceNode = jsonObject.getJSONObject("source");
        if (sourceNode == null) {
            throw new BadEventException(String.format("\u65e0\u6548\u7684\u6570\u636e:%s", ix));
        }
        JSONObject dataNode = jsonObject.getJSONObject("after");
        if (dataNode == null) {
            dataNode = jsonObject.getJSONObject("before");
        }
        if (!(keyOptional = dataNode.keySet().stream().filter(ii -> ii.replace("_", "").equalsIgnoreCase("pkId")).findFirst()).isPresent()) {
            keyOptional = dataNode.keySet().stream().filter(ii -> ii.equalsIgnoreCase("id")).findFirst();
        }
        if (!keyOptional.isPresent()) {
            throw new BadEventException(String.format("\u627e\u4e0d\u5230\u5bf9\u5e94\u7684\u4e3b\u952e:%s", ix));
        }
        String id = dataNode.getString(keyOptional.get());
        Date lastSyncTime = DebeziumJsonNodeDtoUtils.getLastSyncTimeValue((JSONObject)jsonObject);
        BinlogCdcValue cdcValue = BinlogCdcValue.create(id, sourceNode.getString("db"), sourceNode.getString("table"), ix, lastSyncTime);
        HashMap sourceMeta = new HashMap();
        for (String key : record.sourcePartition().keySet()) {
            sourceMeta.put(key, record.sourcePartition().get(key));
        }
        for (String key : record.sourceOffset().keySet()) {
            sourceMeta.put(key, record.sourceOffset().get(key));
        }
        cdcValue.changeBinlog(sourceMeta);
        return cdcValue;
    }
}

