package com.bcxin.flink.cdc.kafka.source.task.compnents;

import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.event.job.core.domain.JedisPoolFactory;
import com.bcxin.event.job.core.domain.dtos.RedisConfig;
import com.bcxin.flink.cdc.kafka.source.task.StartupOptionUtil;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogCdcValue;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogOffsetValue;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import redis.clients.jedis.Jedis;

/* loaded from: input_file:com/bcxin/flink/cdc/kafka/source/task/compnents/BinlogCheckpointProcessFunction.class */
public class BinlogCheckpointProcessFunction extends KeyedProcessFunction<String, BinlogCdcValue, BinlogCdcValue> implements CheckpointedFunction {
    private static final Logger logger = LoggerFactory.getLogger(BinlogCheckpointProcessFunction.class);
    private transient ValueState<BinlogOffsetValue> binlogOffsetValueValueState;
    private transient MapState<String, Date> lastMapKeySyncDateState;
    private transient boolean hasValueChangedAfterSnapshot = false;
    private transient Jedis jedis;
    private final RedisConfig redisConfig;
    private final BinlogOffsetValue binlogOffsetValue;

    public BinlogCheckpointProcessFunction(RedisConfig redisConfig, BinlogOffsetValue binlogOffsetValue) {
        this.redisConfig = redisConfig;
        this.binlogOffsetValue = binlogOffsetValue;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        try {
            this.binlogOffsetValueValueState = getRuntimeContext().getState(new ValueStateDescriptor("mysql-binlog-offset-instance", BinlogOffsetValue.class));
            try {
                this.lastMapKeySyncDateState = getRuntimeContext().getMapState(new MapStateDescriptor("mysql-binlog-mp-full-table-sync-date-state", String.class, Date.class));
            } catch (Exception e) {
                e.printStackTrace();
                throw new BadEventException("FullTableMap状态初始化异常", e);
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            throw new BadEventException("程序处理出现异常", e2);
        }
    }

    public void close() throws Exception {
        super.close();
        if (this.jedis != null) {
            this.jedis.close();
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        BinlogOffsetValue binlogOffsetValue;
        if (this.binlogOffsetValueValueState == null || !this.hasValueChangedAfterSnapshot || (binlogOffsetValue = (BinlogOffsetValue) this.binlogOffsetValueValueState.value()) == null || !StringUtils.hasLength(binlogOffsetValue.getFile())) {
            return;
        }
        for (int i = 0; i < 10; i++) {
            try {
                if (this.jedis == null) {
                    this.jedis = JedisPoolFactory.getNewJedisResource(this.redisConfig);
                }
                HashMap hashMap = new HashMap();
                hashMap.put(StartupOptionUtil.REDIS_MS_BINLOG_FILE_NAME_OFFSET_BINLOG_FILE_NAME, binlogOffsetValue.getFile());
                hashMap.put(StartupOptionUtil.REDIS_MS_BINLOG_FILE_NAME_OFFSET_BINLOG_FILE_NAME_LAST_TABLE, String.format("%s:%s", binlogOffsetValue.getNote(), new Date()));
                hashMap.put(StartupOptionUtil.REDIS_MS_BINLOG_FILE_NAME_OFFSET_BINLOG_OFFSET_VALUE, String.valueOf(binlogOffsetValue.getOffset()));
                if (StringUtils.hasLength(binlogOffsetValue.getGtIds())) {
                    hashMap.put(StartupOptionUtil.REDIS_MS_BINLOG_GT_IDS_VALUE, binlogOffsetValue.getGtIds());
                }
                this.jedis.hset(StartupOptionUtil.REDIS_MS_BINLOG_FILE_NAME_OFFSET, hashMap);
                this.hasValueChangedAfterSnapshot = false;
                return;
            } catch (Exception e) {
                if (i >= 9) {
                    throw e;
                }
                if (ExceptionUtils.getStackTrace(e).contains("JedisConnectionException")) {
                    if (this.jedis != null) {
                        try {
                            this.jedis.close();
                        } catch (Exception e2) {
                        }
                        this.jedis = JedisPoolFactory.getNewJedisResource(this.redisConfig);
                    }
                    logger.error("系统重新链接并尝试重试:{}-{}", Integer.valueOf(i), e);
                } else {
                    logger.error("其他其他异常: 系统将复用同一个连接进行重试:{}-{}", Integer.valueOf(i), e);
                }
            }
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        if (this.lastMapKeySyncDateState == null || this.lastMapKeySyncDateState.keys() == null) {
            return;
        }
        Iterator it = this.lastMapKeySyncDateState.keys().iterator();
        String str = null;
        while (true) {
            String str2 = str;
            if (!it.hasNext()) {
                return;
            }
            if (StringUtils.hasLength(str2)) {
                throw new BadEventException(String.format("该分区状态中存在多个key(fullTable)的情况(current=%s;new=%s); 请检查程序进行修复", str2, it.next()));
            }
            str = (String) it.next();
        }
    }

    public void processElement(BinlogCdcValue binlogCdcValue, KeyedProcessFunction<String, BinlogCdcValue, BinlogCdcValue>.Context context, Collector<BinlogCdcValue> collector) throws Exception {
        StringBuilder sb = new StringBuilder();
        try {
            sb.append("开始接收binlog的数据信息");
            collector.collect(binlogCdcValue);
            sb.append("开始存储binlogOffsetValueState的信息");
            if (binlogCdcValue.getBinlogOffsetValue() != null) {
                this.binlogOffsetValueValueState.update(binlogCdcValue.getBinlogOffsetValue());
            }
            sb.append("完成存储binlogOffsetValueState的信息");
            this.hasValueChangedAfterSnapshot = true;
        } catch (Exception e) {
            e.printStackTrace();
            Logger logger2 = logger;
            Object[] objArr = new Object[3];
            objArr[0] = sb;
            objArr[1] = binlogCdcValue == null ? "NULL" : binlogCdcValue.getValue();
            objArr[2] = e;
            logger2.error("BinlogCheckpointProcessFunction.processElement执行失败:{}; 数据={} 异常:{}", objArr);
        }
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
        processElement((BinlogCdcValue) obj, (KeyedProcessFunction<String, BinlogCdcValue, BinlogCdcValue>.Context) context, (Collector<BinlogCdcValue>) collector);
    }
}
