package com.bcxin.sync.consumer;

import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONObject;
import com.bcxin.sync.common.CommonConstant;
import com.bcxin.sync.common.emus.DataOperationType;
import com.bcxin.sync.common.emus.PageType;
import com.bcxin.sync.common.utils.KafkaMessageUtil;
import com.bcxin.sync.dtos.kafka.message.TlkEventProjectSyncMessage;
import com.bcxin.sync.dtos.request.OrganizationOpenRequest;
import com.bcxin.sync.service.datasync.OrganizationOpenService;
import com.bcxin.sync.service.datasync.ThirdPageSettingService;
import com.bcxin.sync.service.tenant.TlkEventProjectService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * 团队项目kafka消息监听器
 */
@Slf4j
@Service
public class TlkEventProjectKafkaConsumer {

    @Autowired
    private OrganizationOpenService organizationOpenService;
    @Autowired
    private TlkEventProjectService tlkEventProjectService;
    @Autowired
    private ThirdPageSettingService thirdPageSettingService;

    @KafkaListener(id = "bcx_data_sync_listener_tlk_event_project_to_open",
            topics = CommonConstant.TLK_EVENT_PROJECT_KAFKA_TOPIC,
            groupId = "bcx_data_sync_tlk_event_project_to_open")
    public void bcxDataSyncTlkEventProjectToOpen(ConsumerRecord<String, String> record, Acknowledgment ack) {
        TlkEventProjectSyncMessage project = KafkaMessageUtil.transferKafkaTlkEventProjectRecord2Message(record);
        log.info("kafka监听tlk_event_project表 to open，数据转化为：{}", JSONObject.toJSONString(project));
        try {
            if (project != null) {
                //开通陀螺匠
                Date now = new Date();
                Date endTime = tlkEventProjectService.getEndTimeByDomainId(project.getDomainId(), now);
                if (endTime.after(now)) {
                    OrganizationOpenRequest request = new OrganizationOpenRequest();
                    request.setOrganizationId(project.getDomainId());
                    request.setOrganizationName(project.getDomainName());
                    request.setOpenType(1);
                    request.setEndTime(endTime);
                    request.setOpenDay((int) DateUtil.between(now, endTime, DateUnit.DAY));
                    organizationOpenService.openThird(request);
                } else {
                    log.info("结束时间<=当前时间，无需同步");
                }
            } else {
                log.info("不是新增活动承办方创建项目，无需同步");
            }
        } catch (Exception e) {
            log.error("kafka监听tlk_event_project表 to open，异常：{}", e.getMessage(), e);
        }
        ack.acknowledge();
    }


    @KafkaListener(id = "bcx_data_sync_listener_tlk_event_project_to_third",
            topics = CommonConstant.TLK_EVENT_PROJECT_KAFKA_TOPIC,
            groupId = "bcx_data_sync_tlk_event_project_to_third")
    public void bcxDataSyncTlkEventProjectToThird(ConsumerRecord<String, String> record, Acknowledgment ack) {
        TlkEventProjectSyncMessage project = KafkaMessageUtil.transferKafkaTlkEventProjectRecord2Message2(record);
        log.info("kafka监听tlk_event_project表 to third，数据转化为：{}", JSONObject.toJSONString(project));
        try {
            if (project != null) {
                //有变更风评报告地址或者赛演3d地址，需要同步
                if (project.getOpType().getCode() == DataOperationType.DELETE.getCode()) {
                    thirdPageSettingService.settingThirdPage(project.getId(), project.getName(), PageType.SY3D.getCode(), null, null, null);
                    thirdPageSettingService.settingThirdPage(project.getId(), project.getName(), PageType.FPBG.getCode(), null, null, null);
                } else {
                    thirdPageSettingService.settingThirdPage(project.getId(), project.getName(), PageType.SY3D.getCode(), project.getLink3d(), project.getBeforeOfficePoliceIdIndex(), project.getAfterOfficePoliceIdIndex());
                    thirdPageSettingService.settingThirdPage(project.getId(), project.getName(), PageType.FPBG.getCode(), project.getAssesmentReport(), project.getBeforeOfficePoliceIdIndex(), project.getAfterOfficePoliceIdIndex());
                }
            } else {
                log.info("没有变更风评报告地址或者赛演3d地址，无需同步");
            }
        } catch (Exception e) {
            log.error("kafka监听tlk_event_project表 to third，异常：{}", e.getMessage(), e);
        }
        ack.acknowledge();
    }
}
