/*
 * Decompiled with CFR 0.152.
 */
package com.bcxin.flink.cdc.kafka.source.task.cdcs.interceptors;

import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.event.job.core.domain.dtos.RedisConfig;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.RedisOutputFormat;
import java.io.IOException;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CustomProducerInterceptor
implements ProducerInterceptor<byte[], byte[]> {
    private transient RedisOutputFormat redisOutputFormat;
    public static final String REDIS_CONFIG_KEY_HOST = "REDIS_CONFIG_KEY_HOST";
    public static final String REDIS_CONFIG_KEY_PORT = "REDIS_CONFIG_KEY_PORT";
    public static final String REDIS_CONFIG_KEY_PASSWORD = "REDIS_CONFIG_KEY_PASSWORD";

    public ProducerRecord onSend(ProducerRecord<byte[], byte[]> record) {
        String value = new String((byte[])record.value());
        try {
            this.redisOutputFormat.writeRecord(value);
        }
        catch (IOException e) {
            throw new BadEventException("\u5c06\u6570\u636e\u8ba1\u7b97\u5230Redis\u53d1\u751f\u5f02\u5e38", (Exception)e);
        }
        return record;
    }

    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    public void close() {
        try {
            this.redisOutputFormat.close();
        }
        catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    public void configure(Map<String, ?> configs) {
        if (!(configs.containsKey(REDIS_CONFIG_KEY_HOST) && configs.containsKey(REDIS_CONFIG_KEY_PORT) && configs.containsKey(REDIS_CONFIG_KEY_PASSWORD))) {
            throw new BadEventException(String.format("%s\u914d\u7f6e\u4e0d\u80fd\u4e3a\u7a7a", REDIS_CONFIG_KEY_HOST));
        }
        try {
            int port = Integer.parseInt((String)configs.get(REDIS_CONFIG_KEY_PORT));
            RedisConfig redisConfig = RedisConfig.create((String)((String)configs.get(REDIS_CONFIG_KEY_HOST)), (int)port, (int)30000, (String)((String)configs.get(REDIS_CONFIG_KEY_PASSWORD)));
            this.redisOutputFormat = new RedisOutputFormat(redisConfig);
            this.redisOutputFormat.open(1, 1);
        }
        catch (Exception ex) {
            throw new BadEventException("\u521d\u59cb\u5316Redis");
        }
    }
}

