package com.bcxin.tenant.backend;

import com.alibaba.fastjson.JSONObject;
import com.bcxin.Infrastructures.components.JsonProvider;
import com.bcxin.Infrastructures.exceptions.BadTenantException;
import com.bcxin.api.interfaces.tenants.OrganizationStatisticsRpcProvider;
import com.bcxin.api.interfaces.tenants.requests.organizationRelationship.OrganizationStatisticsCreateRequest;
import com.bcxin.tenant.backend.constants.KafkaConstants;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.*;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.util.backoff.FixedBackOff;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

@Configuration
@ConditionalOnProperty("spring.kafka.bootstrap-servers")
@EnableKafka
public class KafkaConfig {
    private final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServer;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;
    @Value("${spring.kafka.consumer.group-id}")private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.listener.ack-mode}")
    private ContainerProperties.AckMode ackMode;

    private JsonProvider jsonProvider;
    private OrganizationStatisticsRpcProvider organizationStatisticsRpcProvider;

    public KafkaConfig(JsonProvider jsonProvider, OrganizationStatisticsRpcProvider organizationStatisticsRpcProvider) {
        this.jsonProvider = jsonProvider;
        this.organizationStatisticsRpcProvider = organizationStatisticsRpcProvider;
    }
    /**
     * 如果topic不存在, 那么则创建对应的主题
     * @return
     */
    @Bean
    public KafkaAdmin.NewTopics topics() {
        int partitionCount = 5;
        int replicaCount = 1;
        return new KafkaAdmin.NewTopics(
                TopicBuilder.name(KafkaConstants.ORGANIZATION_RELATIONSHIP_TOPIC)
                        .partitions(partitionCount)
                        .replicas(replicaCount)
                        .build(),
                TopicBuilder.name(KafkaConstants.ORGANIZATION_RELATIONSHIP_DEAD_LETER_TOPIC)
                        .partitions(partitionCount)
                        .replicas(replicaCount)
                        .build());
    }


    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
            RetryingBatchErrorHandler errorHandler) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        ContainerProperties containerProperties =
                factory.getContainerProperties();
        containerProperties.setPollTimeout(60_000);
        containerProperties.setAckMode(ackMode);
        factory.setBatchListener(true);
        factory.setAutoStartup(true);
        factory.setConcurrency(2);

        factory.setBatchErrorHandler(errorHandler);
        factory.setMissingTopicsFatal(true);

        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 512);

        /**
         * https://www.coder.work/article/7748413
         */
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000");
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 3000);

        return props;
    }

    @Bean
    public RetryingBatchErrorHandler errorHandler(KafkaTemplate kafkaTemplate) {
        final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
                destinationResolver = (cr, e) -> new TopicPartition(KafkaConstants.ORGANIZATION_RELATIONSHIP_DEAD_LETER_TOPIC, cr.partition());
        DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(kafkaTemplate, destinationResolver) {
            @Override
            public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Exception exception) {
                boolean disallowedOrException = false;
                try {
                    if (!KafkaConstants.ORGANIZATION_RELATIONSHIP_DEAD_LETER_TOPIC.equalsIgnoreCase(record.topic())) {
                        super.accept(record, consumer, exception);
                    }
                } catch (Exception ex) {
                    disallowedOrException = true;
                }

                if (disallowedOrException) {
                    logger.error(String.format("send data to dead letter queue resulted in an error:topic=%s;key=%s;",
                            record.topic(), record.key()));
                }
            }
        };

        return new RetryingBatchErrorHandler(new FixedBackOff(0,5), deadLetterPublishingRecoverer);
    }

    @KafkaListener(id = "${spring.kafka.consumer.group-id}-relationship",
            topics = {KafkaConstants.ORGANIZATION_RELATIONSHIP_TOPIC},
            groupId = "${spring.kafka.consumer.group-id}-relationship")
    public void ackOrgRelationshipListener(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
        if (records.isEmpty()) {
            return;
        }
        boolean allowed2CommitAtFinial = true;
        Exception lastException = null;
        try {
            List<String> orgIds = records.stream().map(ix -> {
                JSONObject recordObject = jsonProvider.toObject(JSONObject.class, ix.value());
                JSONObject before = recordObject.getJSONObject("before");
                JSONObject after = recordObject.getJSONObject("after");
                /**
                 * 只考虑关系创建时，创建组织统计报表记录.
                 */
                if(before == null){
                    return after.getString("selected_organization_id");
                }else{
                    return null;
                }
            }).filter(ii -> ii != null).distinct().collect(Collectors.toList());

            if (!orgIds.isEmpty()) {
                organizationStatisticsRpcProvider.createOrgStatisticsById(OrganizationStatisticsCreateRequest.create(orgIds));
            }
        } catch (Exception e) {
            logger.error("监听新增组织关系表时发生异常：{}", e);
            e.printStackTrace();
            allowed2CommitAtFinial = false;
            lastException = e;
        } finally {
            if (allowed2CommitAtFinial) {
                acknowledgment.acknowledge();
            }else{
                if (lastException != null){
                    throw new BadTenantException(
                            String.format("Failed to consume topic= %s.",
                                    KafkaConstants.ORGANIZATION_RELATIONSHIP_TOPIC)
                            ,lastException);
                }
            }
        }
    }
}
