package com.bcxin.tenant.data.etc.tasks.components;

import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.event.core.KafkaConstants;
import com.bcxin.event.core.exceptions.SkipRetryEventException;
import com.bcxin.event.job.core.domain.CacheProvider;
import com.bcxin.event.job.core.domain.dtos.RedisConfig;
import com.bcxin.event.job.core.domain.impls.CacheProviderImpl;
import com.bcxin.tenant.data.etc.tasks.dtos.DtqRecordDto;
import java.io.IOException;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.sql.DataSource;
import org.apache.commons.lang3.ThreadUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StopWatch;

/* loaded from: input_file:com/bcxin/tenant/data/etc/tasks/components/CustomJdbcOutputFormat.class */
public class CustomJdbcOutputFormat extends JdbcOutputFormat implements Serializable, Runnable {
    private static final Logger logger = LoggerFactory.getLogger(CustomJdbcOutputFormat.class);
    private final JdbcConnectionOptions jdbcConnectionOptions;
    private volatile DataSource dataSource;
    private final List<Object> batch;
    private final CustomJdbcAcceptPreparedStatementParameter acceptPreparedStatementParameter;
    private final String sql;
    private final JdbcExecutionOptions executionOptions;
    private transient ScheduledExecutorService scheduler;
    private final String bootstrapServer;
    private final RedisConfig redisConfig;
    private final Collection<String> batchOrConditionExpress;
    private transient ScheduledFuture<?> scheduledFuture;
    private transient KafkaOutputFormat kafkaOutputFormat;
    private transient CacheProvider cacheProvider;

    public CustomJdbcOutputFormat(Collection<String> collection, @Nonnull JdbcConnectionOptions jdbcConnectionOptions, @Nonnull JdbcExecutionOptions jdbcExecutionOptions, @Nonnull JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory, @Nonnull String str, @Nonnull String str2, RedisConfig redisConfig, CustomJdbcAcceptPreparedStatementParameter customJdbcAcceptPreparedStatementParameter, @Nonnull JdbcOutputFormat.RecordExtractor recordExtractor) {
        super(new SimpleJdbcConnectionProvider(jdbcConnectionOptions), jdbcExecutionOptions, statementExecutorFactory, recordExtractor);
        this.batchOrConditionExpress = collection;
        this.jdbcConnectionOptions = jdbcConnectionOptions;
        this.batch = new ArrayList();
        this.sql = str;
        this.acceptPreparedStatementParameter = customJdbcAcceptPreparedStatementParameter;
        this.executionOptions = jdbcExecutionOptions;
        this.bootstrapServer = str2;
        this.redisConfig = redisConfig;
        Runtime.getRuntime().addShutdownHook(new Thread(this));
    }

    public void configure(Configuration configuration) {
        super.configure(configuration);
    }

    public void open(int i, int i2) throws IOException {
        this.dataSource = DataSourceUtil.getDataSource(this.jdbcConnectionOptions.getDriverName(), this.jdbcConnectionOptions.getDbURL(), (String) this.jdbcConnectionOptions.getUsername().get(), (String) this.jdbcConnectionOptions.getPassword().get());
        this.kafkaOutputFormat = new KafkaOutputFormat(this.bootstrapServer, KafkaConstants.getDtqTopic("jdbc_consumer_error"), 100, 5000);
        this.kafkaOutputFormat.open(i, i2);
        if (this.executionOptions.getBatchIntervalMs() != 0 && this.executionOptions.getBatchSize() != 1) {
            this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
                synchronized (CustomJdbcOutputFormat.class) {
                    try {
                        flush();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, this.executionOptions.getBatchIntervalMs(), this.executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
        }
        this.cacheProvider = new CacheProviderImpl(this.redisConfig);
    }

    public synchronized void flush() throws IOException {
        super.flush();
    }

    public synchronized void close() {
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(true);
        }
        if (this.cacheProvider != null) {
            this.cacheProvider.close();
        }
    }

    public Connection getConnection() {
        return null;
    }

    public void updateExecutor(boolean z) throws SQLException, ClassNotFoundException {
    }

    protected void addToBatch(Object obj, Object obj2) throws SQLException {
        this.batch.add(obj2);
    }

    protected void attemptFlush() throws SQLException {
        if (this.batch.isEmpty()) {
            return;
        }
        StringBuilder sb = new StringBuilder();
        sb.append(String.format("v3-当前batch的数据量=%s;", Integer.valueOf(this.batch.size())));
        StopWatch stopWatch = new StopWatch();
        ArrayList arrayList = new ArrayList();
        JsonProviderImpl jsonProviderImpl = new JsonProviderImpl();
        String str = null;
        Iterator<Object> it = this.batch.iterator();
        while (it.hasNext()) {
            BinlogRawValue binlogRawValue = (BinlogRawValue) it.next();
            if (binlogRawValue.isMatchCondition(jsonProviderImpl, this.batchOrConditionExpress)) {
                arrayList.add(binlogRawValue);
            } else {
                str = binlogRawValue.getConditionExecuteResult();
            }
        }
        stopWatch.start();
        try {
            try {
                if (arrayList.isEmpty()) {
                    sb.append(String.format("总共有%s/%s条数据不符合condition条件[lasExpression=%s];", Integer.valueOf(this.batch.size() - arrayList.size()), Integer.valueOf(this.batch.size()), str));
                } else {
                    sb.append(String.format("开始JDBC操作[%s/%s]:", Integer.valueOf(arrayList.size()), Integer.valueOf(this.batch.size())));
                    Connection connection = this.dataSource.getConnection();
                    try {
                        connection.setTransactionIsolation(2);
                        sb.append(executeSql(connection, arrayList));
                        if (connection != null) {
                            connection.close();
                        }
                    } catch (Throwable th) {
                        if (connection != null) {
                            try {
                                connection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                try {
                    stopWatch.stop();
                    sb.append(String.format("总共耗时=%s ms", Long.valueOf(stopWatch.getTotalTimeMillis())));
                    logger.info(sb.toString());
                } finally {
                }
            } catch (Exception e) {
                logger.error(sb.toString(), e);
                try {
                    stopWatch.stop();
                    sb.append(String.format("总共耗时=%s ms", Long.valueOf(stopWatch.getTotalTimeMillis())));
                    logger.info(sb.toString());
                } finally {
                }
            }
            this.batch.clear();
        } catch (Throwable th3) {
            try {
                stopWatch.stop();
                sb.append(String.format("总共耗时=%s ms", Long.valueOf(stopWatch.getTotalTimeMillis())));
                logger.info(sb.toString());
                throw th3;
            } finally {
                logger.info(sb.toString());
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    private String executeSql(Connection connection, Collection<BinlogRawValue> collection) throws SQLException {
        boolean z = false;
        StringBuilder sb = new StringBuilder();
        PreparedStatement prepareStatement = connection.prepareStatement(this.sql);
        try {
            StopWatch stopWatch = new StopWatch();
            stopWatch.start("准备参数信息");
            Object[] objArr = new Object[1];
            objArr[0] = CollectionUtils.isEmpty(this.batchOrConditionExpress) ? "NULL" : this.batchOrConditionExpress.stream().collect(Collectors.joining(";"));
            sb.append(String.format("执行的表达式:%s;", objArr));
            StringBuilder sb2 = new StringBuilder();
            for (BinlogRawValue binlogRawValue : collection) {
                try {
                    StopWatch stopWatch2 = new StopWatch();
                    stopWatch2.start("截取对象的值信息");
                    this.acceptPreparedStatementParameter.accept(prepareStatement, binlogRawValue, this.cacheProvider);
                    stopWatch2.stop();
                    prepareStatement.addBatch();
                    z = true;
                    sb2.append(String.format("%s,", binlogRawValue.getReadyPkId()));
                } catch (Exception e) {
                    try {
                        String str = (String) binlogRawValue.getReadyParameter("source.db");
                        String str2 = (String) binlogRawValue.getReadyParameter("source.table");
                        String valueOf = String.valueOf(binlogRawValue.getReadyPkId());
                        logger.error("跟踪：数据id={}，表名={},添加失败", valueOf, str2);
                        this.kafkaOutputFormat.writeRecord(DtqRecordDto.create(str, str2, valueOf, new String(binlogRawValue.getValue())));
                        sb.append(String.format("可忽略: 跳过异常并加入死信队列: 无效的jdbc参数信息; 导致该数据无法正常更新到db:%s", e.toString()));
                    } catch (Exception e2) {
                        sb.append(String.format("消息(%s)推送到死信队列发生失败:%s", new String(binlogRawValue.getValue()), e2.toString()));
                    }
                }
            }
            stopWatch.stop();
            sb.append(String.format("参数(%s)耗时:%s ms", sb2, Long.valueOf(stopWatch.getTotalTimeMillis())));
            if (z) {
                boolean z2 = true;
                StringBuilder sb3 = new StringBuilder();
                int i = 0;
                while (true) {
                    if (i >= 10) {
                        break;
                    }
                    try {
                        StopWatch stopWatch3 = new StopWatch();
                        stopWatch3.start("开始执行存储过程");
                        int[] executeBatch = prepareStatement.executeBatch();
                        stopWatch3.stop();
                        sb.append(String.format("(执行完毕[数量=%s]:)", Integer.valueOf(collection.size())));
                        sb.append(String.format("存储过程总共耗时=%s ms", Long.valueOf(stopWatch3.getTotalTimeMillis())));
                        if (executeBatch != null) {
                            sb.append(String.format("其中总共有%s条受影响;", Long.valueOf(Arrays.stream(executeBatch).filter(i2 -> {
                                return i2 == 1;
                            }).count())));
                        }
                        if (collection.size() > 0) {
                            try {
                                BinlogRawValue orElse = collection.stream().findFirst().orElse(null);
                                if (orElse != null) {
                                    logger.error("跟踪v5={}：表名={},statement={}执行成功", new Object[]{sb, (String) orElse.getReadyParameter("source.table"), prepareStatement});
                                }
                            } catch (Exception e3) {
                                logger.error("跟踪v5执行异常={} statement={}", new Object[]{sb, prepareStatement, e3});
                            }
                        }
                        if (1 != 0) {
                            if (sb3.length() > 0) {
                                sb.append(String.format("重试后(当前index=%s)-最终执行成功: jdbc脚本(%s) 重试时错误信息如下:%s", Integer.valueOf(i), this.sql, sb3));
                            } else {
                                sb.append(String.format("归集执行成功: sql=%s;", this.sql));
                            }
                        }
                    } catch (Exception e4) {
                        try {
                            logger.error("执行失败的statement={}，exception={},尝试次数={}", new Object[]{prepareStatement, e4, Integer.valueOf(i)});
                            if ((e4 instanceof SQLException) && e4.toString().contains("java.sql.SQLException: No operations allowed after statement closed.")) {
                                throw e4;
                            }
                            if (ExceptionUtils.getStackTrace(e4).contains("Incorrect")) {
                                logger.error("【不进行重试-v2】执行({})数据库发生异常(参数异常):{}", this.sql, e4);
                                throw new SkipRetryEventException(e4.getMessage(), e4);
                            }
                            z2 = false;
                            if (i >= 10) {
                                e4.printStackTrace();
                                logger.error("jdbc脚本({}) 重试({}次数)之后还是执行异常：{}", new Object[]{this.sql, Integer.valueOf(i), e4});
                                throw e4;
                            }
                            sb3.append(String.format("可暂时忽略(当前重试索引次数=%s): 将进行10次的重试 %s", Integer.valueOf(i), e4));
                            try {
                                ThreadUtils.sleep(Duration.of(5L, ChronoUnit.MILLIS));
                            } catch (Exception e5) {
                            }
                            if (0 != 0) {
                                if (sb3.length() > 0) {
                                    sb.append(String.format("重试后(当前index=%s)-最终执行成功: jdbc脚本(%s) 重试时错误信息如下:%s", Integer.valueOf(i), this.sql, sb3));
                                } else {
                                    sb.append(String.format("归集执行成功: sql=%s;", this.sql));
                                }
                            }
                            i++;
                        } catch (Throwable th) {
                            if (z2) {
                                if (sb3.length() > 0) {
                                    sb.append(String.format("重试后(当前index=%s)-最终执行成功: jdbc脚本(%s) 重试时错误信息如下:%s", Integer.valueOf(i), this.sql, sb3));
                                } else {
                                    sb.append(String.format("归集执行成功: sql=%s;", this.sql));
                                }
                            }
                            throw th;
                        }
                    }
                }
            }
            if (prepareStatement != null) {
                prepareStatement.close();
            }
            return sb.toString();
        } catch (Throwable th2) {
            if (prepareStatement != null) {
                try {
                    prepareStatement.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        boolean z = false;
        try {
            try {
                flush();
                z = true;
                StringBuilder sb = new StringBuilder();
                if (this.batch == null) {
                    sb.append("暂无数据");
                } else {
                    sb.append(String.format("批量数据=%s", this.batch.stream().filter(obj -> {
                        return obj != null;
                    }).map(obj2 -> {
                        return String.valueOf(obj2);
                    }).collect(Collectors.joining(";"))));
                }
                logger.error("系统出现非预期的行为导致异常: 可能丢失的数据为:{}, 数据大小={}", true, sb);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            StringBuilder sb2 = new StringBuilder();
            if (this.batch == null) {
                sb2.append("暂无数据");
            } else {
                sb2.append(String.format("批量数据=%s", this.batch.stream().filter(obj3 -> {
                    return obj3 != null;
                }).map(obj22 -> {
                    return String.valueOf(obj22);
                }).collect(Collectors.joining(";"))));
            }
            logger.error("系统出现非预期的行为导致异常: 可能丢失的数据为:{}, 数据大小={}", Boolean.valueOf(z), sb2);
            throw th;
        }
    }
}
