/*
 * Decompiled with CFR 0.152.
 */
package com.bcxin.flink.cdc.kafka.source.task.compnents;

import com.bcxin.event.core.JsonProvider;
import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.event.core.dtos.HttpDomainRegionDTO;
import com.bcxin.event.core.utils.StringUtils;
import com.bcxin.flink.cdc.kafka.source.task.cdcs.CdcSourceMeta;
import com.bcxin.flink.cdc.kafka.source.task.proerpties.HttpRegionSinkProperty;
import com.bcxin.flink.streaming.cores.JdbcJobExecutorUtil;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

public class WebHttpSinkWriter
implements SinkWriter<String> {
    private static final Logger logger = LoggerFactory.getLogger(WebHttpSinkWriter.class);
    private final List<WebContentParameterDTO> bulkRequests = new ArrayList<WebContentParameterDTO>();
    private transient CloseableHttpClient httpClient = null;
    private final int batchSize;
    private final int batchIntervalMs;
    private final JsonProvider jsonProvider;
    private final Collection<HttpRegionSinkProperty> httpRegionSinkProperties;
    private final String kafkaCdcTopicPrefix;
    private final Connection connection;

    public WebHttpSinkWriter(int batchSize, int batchIntervalMs, Collection<HttpRegionSinkProperty> httpRegionSinkProperties, String kafkaCdcTopicPrefix, Connection connection) {
        this.httpRegionSinkProperties = httpRegionSinkProperties;
        this.kafkaCdcTopicPrefix = kafkaCdcTopicPrefix;
        this.batchSize = batchSize < 10 ? 500 : batchSize;
        this.batchIntervalMs = batchIntervalMs < 500 ? 5000 : batchIntervalMs;
        this.jsonProvider = new JsonProviderImpl();
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(() -> {
            try {
                this.flush(true);
            }
            catch (Exception ex) {
                ex.printStackTrace();
                logger.error("\u6267\u884c\u6570\u636e\u63a8\u9001\u53d1\u751f\u5f02\u5e38", (Throwable)ex);
            }
        }, batchIntervalMs, batchIntervalMs, TimeUnit.MICROSECONDS);
        this.connection = connection;
        this.httpClient = HttpClients.createDefault();
    }

    public void write(String element, SinkWriter.Context context) throws IOException, InterruptedException {
        JsonProviderImpl jsonProvider = new JsonProviderImpl();
        CdcSourceMeta schema = (CdcSourceMeta)jsonProvider.toObject(CdcSourceMeta.class, element);
        if (schema == null || schema.getDbName() == null) {
            logger.error("\u5f53\u524d\u65e0\u6548\u6570\u636e:{}", (Object)element);
            throw new IllegalArgumentException(element);
        }
        String key = schema.getId();
        String topic = this.getKafkaCdcTopicPrefix().replace("[dbName]", schema.getDbName()).replace("[tableName]", schema.getTableName());
        this.bulkRequests.add(WebContentParameterDTO.create(topic, key, element, schema.getDomainId()));
        if (this.batchSize > 0 && this.batchSize <= this.bulkRequests.size()) {
            this.flush(true);
        }
    }

    public void flush(boolean endOfInput) throws IOException, InterruptedException {
        if (CollectionUtils.isEmpty(this.bulkRequests)) {
            return;
        }
        Collection domainIds = this.bulkRequests.stream().filter(ix -> org.springframework.util.StringUtils.hasLength((String)ix.getDomainId())).map(ix -> ix.getDomainId()).distinct().collect(Collectors.toList());
        Collection domainRegionDTOS = JdbcJobExecutorUtil.getDomainRegionDTOs((Collection)domainIds, (Connection)this.connection);
        for (HttpDomainRegionDTO drt : domainRegionDTOS) {
            if (!org.springframework.util.StringUtils.hasLength((String)drt.getDomainId())) {
                logger.error("\u5f53\u524d\u7ec4\u7ec7\uff08{}\uff09\u7684\u6570\u636e\u7ec4\u7ec7\u4fe1\u606f\u65e0\u6548", (Object)drt.getDomainId());
                continue;
            }
            Set<HttpRegionSinkProperty> regionSinkProperties = (Set<HttpRegionSinkProperty>)((Object)this.getHttpRegionSinkProperties().stream().filter(ix -> org.springframework.util.StringUtils.hasLength((String)ix.getRegionIdPrefix()) && drt.getRegionId().startsWith(ix.getRegionIdPrefix())).collect(Collectors.toList()));
            if (CollectionUtils.isEmpty((Collection)regionSinkProperties)) {
                logger.error("\u53ef\u5ffd\u7565: \u5f53\u524d\u7ec4\u7ec7\uff08{}\uff09\u7684\u6570\u636e\u7531\u4e8e\u65e0\u914d\u7f6e\u533a\u57df\u4fe1\u606f; \u56e0\u6b64, \u65e0\u6cd5\u63a8\u9001\u5230\u5bf9\u5e94\u5b50\u7ad9", (Object)drt.getDomainId());
                regionSinkProperties = Collections.singleton(this.getHttpRegionSinkProperties().stream().findFirst().get());
            }
            List<WebContentParameterDTO> selectedWebContents = this.bulkRequests.stream().filter(ix -> org.springframework.util.StringUtils.hasLength((String)ix.getDomainId()) && ix.getDomainId().equalsIgnoreCase(drt.getDomainId())).collect(Collectors.toList());
            for (HttpRegionSinkProperty httpRegionSink : regionSinkProperties) {
                if (org.springframework.util.StringUtils.hasLength((String)drt.getRegionId()) && !StringUtils.hasAlpha((String)drt.getRegionId())) {
                    for (WebContentParameterDTO wp : selectedWebContents) {
                        wp.setKey(String.format("%s#no_region#%s#%s", wp.getKey(), drt.getRegionId(), drt.getSuperviseDepartId()));
                    }
                }
                this.doExecute(httpRegionSink.getApi(), selectedWebContents);
            }
        }
    }

    public void close() throws Exception {
        if (this.connection != null && !this.connection.isClosed()) {
            this.connection.close();
        }
        this.httpClient.close();
    }

    private void doExecute(String api, List<WebContentParameterDTO> parameters) throws IOException {
        String body = null;
        try {
            HashMap<String, List<WebContentParameterDTO>> requestItems = new HashMap<String, List<WebContentParameterDTO>>();
            requestItems.put("items", parameters);
            body = this.jsonProvider.getJson(requestItems);
            HttpPost post = new HttpPost(api);
            StringEntity httpEntity = new StringEntity(body, "UTF-8");
            httpEntity.setContentEncoding(StandardCharsets.UTF_8.toString());
            httpEntity.setContentType("application/json");
            post.setEntity((HttpEntity)httpEntity);
            try (CloseableHttpResponse response = this.httpClient.execute((HttpUriRequest)post);){
                StatusLine statusLine = response.getStatusLine();
                if (statusLine.getStatusCode() != 200) {
                    String content = EntityUtils.toString((HttpEntity)response.getEntity());
                    logger.error("{}: \u6267\u884c\u7ed3\u675f(\u603b\u6570\u91cf={}):{} \u5931\u8d25-\u72b6\u6001:{} \u8bf7\u6c42-{}; \u54cd\u5e94:{}", new Object[]{Thread.currentThread().getId(), this.bulkRequests.size(), api, statusLine.getStatusCode(), body, content});
                } else {
                    logger.error("{}: \u6210\u529f-\u6267\u884c\u7ed3\u675f(\u603b\u6570\u91cf={}):{} Ok-\u54cd\u5e94:{}", new Object[]{Thread.currentThread().getId(), this.bulkRequests.size(), api, statusLine.getStatusCode()});
                }
            }
        }
        catch (Exception ex) {
            logger.error("WebHttpSink\u8c03\u7528(body={})\u53d1\u751f\u5f02\u5e38", body, (Object)ex);
        }
    }

    public List<WebContentParameterDTO> getBulkRequests() {
        return this.bulkRequests;
    }

    public CloseableHttpClient getHttpClient() {
        return this.httpClient;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public int getBatchIntervalMs() {
        return this.batchIntervalMs;
    }

    public JsonProvider getJsonProvider() {
        return this.jsonProvider;
    }

    public Collection<HttpRegionSinkProperty> getHttpRegionSinkProperties() {
        return this.httpRegionSinkProperties;
    }

    public String getKafkaCdcTopicPrefix() {
        return this.kafkaCdcTopicPrefix;
    }

    public Connection getConnection() {
        return this.connection;
    }

    public static class WebContentParameterDTO
    implements Serializable {
        private String topic;
        private String key;
        private String value;
        private String domainId;

        public static WebContentParameterDTO create(String topic, String key, String value, String domainId) {
            WebContentParameterDTO parameter = new WebContentParameterDTO();
            parameter.setKey(key);
            parameter.setTopic(topic);
            parameter.setValue(value);
            parameter.setDomainId(domainId);
            return parameter;
        }

        public String getTopic() {
            return this.topic;
        }

        public String getKey() {
            return this.key;
        }

        public String getValue() {
            return this.value;
        }

        public String getDomainId() {
            return this.domainId;
        }

        public void setTopic(String topic) {
            this.topic = topic;
        }

        public void setKey(String key) {
            this.key = key;
        }

        public void setValue(String value) {
            this.value = value;
        }

        public void setDomainId(String domainId) {
            this.domainId = domainId;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof WebContentParameterDTO)) {
                return false;
            }
            WebContentParameterDTO other = (WebContentParameterDTO)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$topic = this.getTopic();
            String other$topic = other.getTopic();
            if (this$topic == null ? other$topic != null : !this$topic.equals(other$topic)) {
                return false;
            }
            String this$key = this.getKey();
            String other$key = other.getKey();
            if (this$key == null ? other$key != null : !this$key.equals(other$key)) {
                return false;
            }
            String this$value = this.getValue();
            String other$value = other.getValue();
            if (this$value == null ? other$value != null : !this$value.equals(other$value)) {
                return false;
            }
            String this$domainId = this.getDomainId();
            String other$domainId = other.getDomainId();
            return !(this$domainId == null ? other$domainId != null : !this$domainId.equals(other$domainId));
        }

        protected boolean canEqual(Object other) {
            return other instanceof WebContentParameterDTO;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $topic = this.getTopic();
            result = result * 59 + ($topic == null ? 43 : $topic.hashCode());
            String $key = this.getKey();
            result = result * 59 + ($key == null ? 43 : $key.hashCode());
            String $value = this.getValue();
            result = result * 59 + ($value == null ? 43 : $value.hashCode());
            String $domainId = this.getDomainId();
            result = result * 59 + ($domainId == null ? 43 : $domainId.hashCode());
            return result;
        }

        public String toString() {
            return "WebHttpSinkWriter.WebContentParameterDTO(topic=" + this.getTopic() + ", key=" + this.getKey() + ", value=" + this.getValue() + ", domainId=" + this.getDomainId() + ")";
        }
    }
}

