package com.bcxin.flink.cdc.kafka.source.task.cdcs.interceptors;

import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.event.job.core.domain.dtos.RedisConfig;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.RedisOutputFormat;
import java.io.IOException;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/* loaded from: input_file:com/bcxin/flink/cdc/kafka/source/task/cdcs/interceptors/CustomProducerInterceptor.class */
public class CustomProducerInterceptor implements ProducerInterceptor<byte[], byte[]> {
    private transient RedisOutputFormat redisOutputFormat;
    public static final String REDIS_CONFIG_KEY_HOST = "REDIS_CONFIG_KEY_HOST";
    public static final String REDIS_CONFIG_KEY_PORT = "REDIS_CONFIG_KEY_PORT";
    public static final String REDIS_CONFIG_KEY_PASSWORD = "REDIS_CONFIG_KEY_PASSWORD";

    public ProducerRecord onSend(ProducerRecord<byte[], byte[]> producerRecord) {
        try {
            this.redisOutputFormat.writeRecord(new String((byte[]) producerRecord.value()));
            return producerRecord;
        } catch (IOException e) {
            throw new BadEventException("将数据计算到Redis发生异常", e);
        }
    }

    public void onAcknowledgement(RecordMetadata recordMetadata, Exception exc) {
    }

    public void close() {
        try {
            this.redisOutputFormat.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void configure(Map<String, ?> map) {
        if (!map.containsKey(REDIS_CONFIG_KEY_HOST) || !map.containsKey(REDIS_CONFIG_KEY_PORT) || !map.containsKey(REDIS_CONFIG_KEY_PASSWORD)) {
            throw new BadEventException(String.format("%s配置不能为空", REDIS_CONFIG_KEY_HOST));
        }
        try {
            this.redisOutputFormat = new RedisOutputFormat(RedisConfig.create((String) map.get(REDIS_CONFIG_KEY_HOST), Integer.parseInt((String) map.get(REDIS_CONFIG_KEY_PORT)), 30000, (String) map.get(REDIS_CONFIG_KEY_PASSWORD)));
            this.redisOutputFormat.open(1, 1);
        } catch (Exception e) {
            throw new BadEventException("初始化Redis");
        }
    }
}
