package com.bcxin.flink.cdc.kafka.source.task.compnents;

import com.bcxin.event.core.JsonProvider;
import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.event.core.dtos.HttpDomainRegionDTO;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.CdcSourceMeta;
import com.bcxin.flink.cdc.kafka.source.task.proerpties.HttpRegionSinkProperty;
import com.bcxin.flink.streaming.cores.JdbcJobExecutorUtil;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/bcxin/flink/cdc/kafka/source/task/compnents/WebHttpSinkWriter.class */
public class WebHttpSinkWriter implements SinkWriter<String> {
    private static final Logger logger = LoggerFactory.getLogger(WebHttpSinkWriter.class);
    private final List<WebContentParameterDTO> bulkRequests = new ArrayList();
    private transient CloseableHttpClient httpClient;
    private final int batchSize;
    private final int batchIntervalMs;
    private final JsonProvider jsonProvider;
    private final Collection<HttpRegionSinkProperty> httpRegionSinkProperties;
    private final String kafkaCdcTopicPrefix;
    private final Connection connection;

    /* loaded from: input_file:com/bcxin/flink/cdc/kafka/source/task/compnents/WebHttpSinkWriter$WebContentParameterDTO.class */
    public static class WebContentParameterDTO implements Serializable {
        private String topic;
        private String key;
        private String value;
        private String domainId;

        public static WebContentParameterDTO create(String str, String str2, String str3, String str4) {
            WebContentParameterDTO webContentParameterDTO = new WebContentParameterDTO();
            webContentParameterDTO.setKey(str2);
            webContentParameterDTO.setTopic(str);
            webContentParameterDTO.setValue(str3);
            webContentParameterDTO.setDomainId(str4);
            return webContentParameterDTO;
        }

        public String getTopic() {
            return this.topic;
        }

        public String getKey() {
            return this.key;
        }

        public String getValue() {
            return this.value;
        }

        public String getDomainId() {
            return this.domainId;
        }

        public void setTopic(String str) {
            this.topic = str;
        }

        public void setKey(String str) {
            this.key = str;
        }

        public void setValue(String str) {
            this.value = str;
        }

        public void setDomainId(String str) {
            this.domainId = str;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof WebContentParameterDTO)) {
                return false;
            }
            WebContentParameterDTO webContentParameterDTO = (WebContentParameterDTO) obj;
            if (!webContentParameterDTO.canEqual(this)) {
                return false;
            }
            String topic = getTopic();
            String topic2 = webContentParameterDTO.getTopic();
            if (topic == null) {
                if (topic2 != null) {
                    return false;
                }
            } else if (!topic.equals(topic2)) {
                return false;
            }
            String key = getKey();
            String key2 = webContentParameterDTO.getKey();
            if (key == null) {
                if (key2 != null) {
                    return false;
                }
            } else if (!key.equals(key2)) {
                return false;
            }
            String value = getValue();
            String value2 = webContentParameterDTO.getValue();
            if (value == null) {
                if (value2 != null) {
                    return false;
                }
            } else if (!value.equals(value2)) {
                return false;
            }
            String domainId = getDomainId();
            String domainId2 = webContentParameterDTO.getDomainId();
            return domainId == null ? domainId2 == null : domainId.equals(domainId2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof WebContentParameterDTO;
        }

        public int hashCode() {
            String topic = getTopic();
            int hashCode = (1 * 59) + (topic == null ? 43 : topic.hashCode());
            String key = getKey();
            int hashCode2 = (hashCode * 59) + (key == null ? 43 : key.hashCode());
            String value = getValue();
            int hashCode3 = (hashCode2 * 59) + (value == null ? 43 : value.hashCode());
            String domainId = getDomainId();
            return (hashCode3 * 59) + (domainId == null ? 43 : domainId.hashCode());
        }

        public String toString() {
            return "WebHttpSinkWriter.WebContentParameterDTO(topic=" + getTopic() + ", key=" + getKey() + ", value=" + getValue() + ", domainId=" + getDomainId() + ")";
        }
    }

    public WebHttpSinkWriter(int i, int i2, Collection<HttpRegionSinkProperty> collection, String str, Connection connection) {
        this.httpClient = null;
        this.httpRegionSinkProperties = collection;
        this.kafkaCdcTopicPrefix = str;
        if (i < 10) {
            this.batchSize = 500;
        } else {
            this.batchSize = i;
        }
        if (i2 < 500) {
            this.batchIntervalMs = 5000;
        } else {
            this.batchIntervalMs = i2;
        }
        this.jsonProvider = new JsonProviderImpl();
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            try {
                flush(true);
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("执行数据推送发生异常", e);
            }
        }, i2, i2, TimeUnit.MICROSECONDS);
        this.connection = connection;
        this.httpClient = HttpClients.createDefault();
    }

    public void write(String str, SinkWriter.Context context) throws IOException, InterruptedException {
        CdcSourceMeta cdcSourceMeta = (CdcSourceMeta) new JsonProviderImpl().toObject(CdcSourceMeta.class, str);
        if (cdcSourceMeta == null || cdcSourceMeta.getDbName() == null) {
            logger.error("当前无效数据:{}", str);
            throw new IllegalArgumentException(str);
        }
        String id = cdcSourceMeta.getId();
        this.bulkRequests.add(WebContentParameterDTO.create(getKafkaCdcTopicPrefix().replace("[dbName]", cdcSourceMeta.getDbName()).replace("[tableName]", cdcSourceMeta.getTableName()), id, str, cdcSourceMeta.getDomainId()));
        if (this.batchSize <= 0 || this.batchSize > this.bulkRequests.size()) {
            return;
        }
        flush(true);
    }

    public void flush(boolean z) throws IOException, InterruptedException {
        if (CollectionUtils.isEmpty(this.bulkRequests)) {
            return;
        }
        for (HttpDomainRegionDTO httpDomainRegionDTO : JdbcJobExecutorUtil.getDomainRegionDTOs((Collection) this.bulkRequests.stream().filter(webContentParameterDTO -> {
            return StringUtils.hasLength(webContentParameterDTO.getDomainId());
        }).map(webContentParameterDTO2 -> {
            return webContentParameterDTO2.getDomainId();
        }).distinct().collect(Collectors.toList()), this.connection)) {
            if (StringUtils.hasLength(httpDomainRegionDTO.getDomainId())) {
                Collection<HttpRegionSinkProperty> collection = (Collection) getHttpRegionSinkProperties().stream().filter(httpRegionSinkProperty -> {
                    return StringUtils.hasLength(httpRegionSinkProperty.getRegionIdPrefix()) && httpDomainRegionDTO.getRegionId().startsWith(httpRegionSinkProperty.getRegionIdPrefix());
                }).collect(Collectors.toList());
                if (CollectionUtils.isEmpty(collection)) {
                    logger.error("可忽略: 当前组织（{}）的数据由于无配置区域信息; 因此, 无法推送到对应子站", httpDomainRegionDTO.getDomainId());
                    collection = Collections.singleton(getHttpRegionSinkProperties().stream().findFirst().get());
                }
                List<WebContentParameterDTO> list = (List) this.bulkRequests.stream().filter(webContentParameterDTO3 -> {
                    return StringUtils.hasLength(webContentParameterDTO3.getDomainId()) && webContentParameterDTO3.getDomainId().equalsIgnoreCase(httpDomainRegionDTO.getDomainId());
                }).collect(Collectors.toList());
                for (HttpRegionSinkProperty httpRegionSinkProperty2 : collection) {
                    if (StringUtils.hasLength(httpDomainRegionDTO.getRegionId()) && !com.bcxin.event.core.utils.StringUtils.hasAlpha(httpDomainRegionDTO.getRegionId())) {
                        for (WebContentParameterDTO webContentParameterDTO4 : list) {
                            webContentParameterDTO4.setKey(String.format("%s#no_region#%s#%s", webContentParameterDTO4.getKey(), httpDomainRegionDTO.getRegionId(), httpDomainRegionDTO.getSuperviseDepartId()));
                        }
                    }
                    doExecute(httpRegionSinkProperty2.getApi(), list);
                }
            } else {
                logger.error("当前组织（{}）的数据组织信息无效", httpDomainRegionDTO.getDomainId());
            }
        }
    }

    public void close() throws Exception {
        if (this.connection != null && !this.connection.isClosed()) {
            this.connection.close();
        }
        this.httpClient.close();
    }

    private void doExecute(String str, List<WebContentParameterDTO> list) throws IOException {
        String str2 = null;
        try {
            HashMap hashMap = new HashMap();
            hashMap.put("items", list);
            str2 = this.jsonProvider.getJson(hashMap);
            HttpPost httpPost = new HttpPost(str);
            StringEntity stringEntity = new StringEntity(str2, "UTF-8");
            stringEntity.setContentEncoding(StandardCharsets.UTF_8.toString());
            stringEntity.setContentType("application/json");
            httpPost.setEntity(stringEntity);
            CloseableHttpResponse execute = this.httpClient.execute(httpPost);
            try {
                StatusLine statusLine = execute.getStatusLine();
                if (statusLine.getStatusCode() != 200) {
                    logger.error("{}: 执行结束(总数量={}):{} 失败-状态:{} 请求-{}; 响应:{}", new Object[]{Long.valueOf(Thread.currentThread().getId()), Integer.valueOf(this.bulkRequests.size()), str, Integer.valueOf(statusLine.getStatusCode()), str2, EntityUtils.toString(execute.getEntity())});
                } else {
                    logger.error("{}: 成功-执行结束(总数量={}):{} Ok-响应:{}", new Object[]{Long.valueOf(Thread.currentThread().getId()), Integer.valueOf(this.bulkRequests.size()), str, Integer.valueOf(statusLine.getStatusCode())});
                }
                if (execute != null) {
                    execute.close();
                }
            } finally {
            }
        } catch (Exception e) {
            logger.error("WebHttpSink调用(body={})发生异常", str2, e);
        }
    }

    public List<WebContentParameterDTO> getBulkRequests() {
        return this.bulkRequests;
    }

    public CloseableHttpClient getHttpClient() {
        return this.httpClient;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public int getBatchIntervalMs() {
        return this.batchIntervalMs;
    }

    public JsonProvider getJsonProvider() {
        return this.jsonProvider;
    }

    public Collection<HttpRegionSinkProperty> getHttpRegionSinkProperties() {
        return this.httpRegionSinkProperties;
    }

    public String getKafkaCdcTopicPrefix() {
        return this.kafkaCdcTopicPrefix;
    }

    public Connection getConnection() {
        return this.connection;
    }
}
