/*
 * Decompiled with CFR 0.152.
 */
package com.bcxin.flink.cdc.kafka.source.task.compnents;

import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.event.job.core.domain.JedisPoolFactory;
import com.bcxin.event.job.core.domain.dtos.RedisConfig;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogCdcValue;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.BinlogOffsetValue;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import redis.clients.jedis.Jedis;

public class BinlogCheckpointProcessFunction
extends KeyedProcessFunction<String, BinlogCdcValue, BinlogCdcValue>
implements CheckpointedFunction {
    private static final Logger logger = LoggerFactory.getLogger(BinlogCheckpointProcessFunction.class);
    private transient ValueState<BinlogOffsetValue> binlogOffsetValueValueState;
    private transient MapState<String, Date> lastMapKeySyncDateState;
    private transient boolean hasValueChangedAfterSnapshot = false;
    private transient Jedis jedis;
    private final RedisConfig redisConfig;
    private final BinlogOffsetValue binlogOffsetValue;

    public BinlogCheckpointProcessFunction(RedisConfig redisConfig, BinlogOffsetValue binlogOffsetValue) {
        this.redisConfig = redisConfig;
        this.binlogOffsetValue = binlogOffsetValue;
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        try {
            ValueStateDescriptor binlogOffsetValueValueStateDescriptor = new ValueStateDescriptor("mysql-binlog-offset-instance", BinlogOffsetValue.class);
            this.binlogOffsetValueValueState = this.getRuntimeContext().getState(binlogOffsetValueValueStateDescriptor);
        }
        catch (Exception ex) {
            ex.printStackTrace();
            throw new BadEventException("\u7a0b\u5e8f\u5904\u7406\u51fa\u73b0\u5f02\u5e38", ex);
        }
        try {
            MapStateDescriptor lastFullTableSyncDateStateDescriptor = new MapStateDescriptor("mysql-binlog-mp-full-table-sync-date-state", String.class, Date.class);
            this.lastMapKeySyncDateState = this.getRuntimeContext().getMapState(lastFullTableSyncDateStateDescriptor);
        }
        catch (Exception ex) {
            ex.printStackTrace();
            throw new BadEventException("FullTableMap\u72b6\u6001\u521d\u59cb\u5316\u5f02\u5e38", ex);
        }
    }

    public void close() throws Exception {
        super.close();
        if (this.jedis != null) {
            this.jedis.close();
        }
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        BinlogOffsetValue offsetValue;
        if (this.binlogOffsetValueValueState != null && this.hasValueChangedAfterSnapshot && (offsetValue = (BinlogOffsetValue)this.binlogOffsetValueValueState.value()) != null && StringUtils.hasLength((String)offsetValue.getFile())) {
            for (int index = 0; index < 10; ++index) {
                try {
                    if (this.jedis == null) {
                        this.jedis = JedisPoolFactory.getNewJedisResource((RedisConfig)this.redisConfig);
                    }
                    HashMap<String, String> binlogMp = new HashMap<String, String>();
                    binlogMp.put("redis_ms_binlog_file_name_offset_file_name", offsetValue.getFile());
                    binlogMp.put("redis_ms_binlog_file_name_offset_file_name_last_table", String.format("%s:%s", offsetValue.getNote(), new Date()));
                    binlogMp.put("redis_ms_binlog_file_name_offset_offset_value", String.valueOf(offsetValue.getOffset()));
                    if (StringUtils.hasLength((String)offsetValue.getGtIds())) {
                        binlogMp.put("redis_ms_binlog_gt_ids_value", offsetValue.getGtIds());
                    }
                    this.jedis.hset("mysql:flink:checkpoints:cdc:redis_ms_binlog_file_name_offset_gt_ids", binlogMp);
                    this.hasValueChangedAfterSnapshot = false;
                    return;
                }
                catch (Exception ex) {
                    if (index >= 9) {
                        throw ex;
                    }
                    if (ExceptionUtils.getStackTrace((Throwable)ex).contains("JedisConnectionException")) {
                        if (this.jedis != null) {
                            try {
                                this.jedis.close();
                            }
                            catch (Exception exception) {
                                // empty catch block
                            }
                            this.jedis = JedisPoolFactory.getNewJedisResource((RedisConfig)this.redisConfig);
                        }
                        logger.error("\u7cfb\u7edf\u91cd\u65b0\u94fe\u63a5\u5e76\u5c1d\u8bd5\u91cd\u8bd5:{}-{}", (Object)index, (Object)ex);
                        continue;
                    }
                    logger.error("\u5176\u4ed6\u5176\u4ed6\u5f02\u5e38: \u7cfb\u7edf\u5c06\u590d\u7528\u540c\u4e00\u4e2a\u8fde\u63a5\u8fdb\u884c\u91cd\u8bd5:{}-{}", (Object)index, (Object)ex);
                    continue;
                }
            }
        }
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        if (this.lastMapKeySyncDateState != null && this.lastMapKeySyncDateState.keys() != null) {
            Iterator keys = this.lastMapKeySyncDateState.keys().iterator();
            String getKey = null;
            while (keys.hasNext()) {
                if (StringUtils.hasLength(getKey)) {
                    throw new BadEventException(String.format("\u8be5\u5206\u533a\u72b6\u6001\u4e2d\u5b58\u5728\u591a\u4e2akey(fullTable)\u7684\u60c5\u51b5(current=%s;new=%s); \u8bf7\u68c0\u67e5\u7a0b\u5e8f\u8fdb\u884c\u4fee\u590d", getKey, keys.next()));
                }
                getKey = (String)keys.next();
            }
        }
    }

    public void processElement(BinlogCdcValue value, KeyedProcessFunction.Context ctx, Collector<BinlogCdcValue> out) throws Exception {
        StringBuilder sb = new StringBuilder();
        try {
            sb.append("\u5f00\u59cb\u63a5\u6536binlog\u7684\u6570\u636e\u4fe1\u606f");
            out.collect((Object)value);
            sb.append("\u5f00\u59cb\u5b58\u50a8binlogOffsetValueState\u7684\u4fe1\u606f");
            if (value.getBinlogOffsetValue() != null) {
                this.binlogOffsetValueValueState.update((Object)value.getBinlogOffsetValue());
            }
            sb.append("\u5b8c\u6210\u5b58\u50a8binlogOffsetValueState\u7684\u4fe1\u606f");
            this.hasValueChangedAfterSnapshot = true;
        }
        catch (Exception ex) {
            ex.printStackTrace();
            logger.error("BinlogCheckpointProcessFunction.processElement\u6267\u884c\u5931\u8d25:{}; \u6570\u636e={} \u5f02\u5e38:{}", new Object[]{sb, value == null ? "NULL" : value.getValue(), ex});
        }
    }
}

