package com.bcxin.flink.cdc.kafka.source.task.compnents;

import com.alibaba.fastjson.JSONObject;
import com.bcxin.event.core.JsonProvider;
import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogCdcValue;
import com.bcxin.flink.streaming.cores.utils.DebeziumJsonNodeDtoUtils;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.ConverterType;

/* loaded from: input_file:com/bcxin/flink/cdc/kafka/source/task/compnents/JsonBinlogMetaDebeziumDeserializationSchema.class */
public class JsonBinlogMetaDebeziumDeserializationSchema implements DebeziumDeserializationSchema<BinlogCdcValue> {
    private static final long serialVersionUID = 1;
    private transient JsonConverter jsonConverter;
    private final Boolean includeSchema;
    private Map<String, Object> customConverterConfigs;
    private final JsonProvider jsonProvider;

    public JsonBinlogMetaDebeziumDeserializationSchema() {
        this(false);
    }

    public JsonBinlogMetaDebeziumDeserializationSchema(Boolean bool) {
        this.includeSchema = bool;
        this.jsonProvider = new JsonProviderImpl();
    }

    public JsonBinlogMetaDebeziumDeserializationSchema(Boolean bool, Map<String, Object> map) {
        this.includeSchema = bool;
        this.customConverterConfigs = map;
        this.jsonProvider = new JsonProviderImpl();
    }

    public void deserialize(SourceRecord sourceRecord, Collector<BinlogCdcValue> collector) throws Exception {
        if (this.jsonConverter == null) {
            initializeJsonConverter();
        }
        collector.collect(translate2BinlogCdcValue(sourceRecord, this.jsonConverter.fromConnectData(sourceRecord.topic(), sourceRecord.valueSchema(), sourceRecord.value())));
    }

    private void initializeJsonConverter() {
        this.jsonConverter = new JsonConverter();
        HashMap hashMap = new HashMap(2);
        hashMap.put("converter.type", ConverterType.VALUE.getName());
        hashMap.put("schemas.enable", this.includeSchema);
        if (this.customConverterConfigs != null) {
            hashMap.putAll(this.customConverterConfigs);
        }
        this.jsonConverter.configure(hashMap);
    }

    public TypeInformation<BinlogCdcValue> getProducedType() {
        return BasicTypeInfo.of(BinlogCdcValue.class);
    }

    private BinlogCdcValue translate2BinlogCdcValue(SourceRecord sourceRecord, byte[] bArr) {
        String str = new String(bArr);
        JSONObject jSONObject = (JSONObject) this.jsonProvider.toObject(JSONObject.class, str);
        if (jSONObject == null) {
            throw new BadEventException(String.format("无效的数据:%s", str));
        }
        JSONObject jSONObject2 = jSONObject.getJSONObject("source");
        if (jSONObject2 == null) {
            throw new BadEventException(String.format("无效的数据:%s", str));
        }
        JSONObject jSONObject3 = jSONObject.getJSONObject("after");
        if (jSONObject3 == null) {
            jSONObject3 = jSONObject.getJSONObject("before");
        }
        Optional findFirst = jSONObject3.keySet().stream().filter(str2 -> {
            return str2.replace("_", "").equalsIgnoreCase("pkId");
        }).findFirst();
        if (!findFirst.isPresent()) {
            findFirst = jSONObject3.keySet().stream().filter(str3 -> {
                return str3.equalsIgnoreCase("id");
            }).findFirst();
        }
        if (!findFirst.isPresent()) {
            throw new BadEventException(String.format("找不到对应的主键:%s", str));
        }
        BinlogCdcValue create = BinlogCdcValue.create(jSONObject3.getString((String) findFirst.get()), jSONObject2.getString("db"), jSONObject2.getString("table"), str, DebeziumJsonNodeDtoUtils.getLastSyncTimeValue(jSONObject));
        HashMap hashMap = new HashMap();
        for (String str4 : sourceRecord.sourcePartition().keySet()) {
            hashMap.put(str4, sourceRecord.sourcePartition().get(str4));
        }
        for (String str5 : sourceRecord.sourceOffset().keySet()) {
            hashMap.put(str5, sourceRecord.sourceOffset().get(str5));
        }
        create.changeBinlog(hashMap);
        return create;
    }
}
