package com.bcxin.tenant.data.etc.table.tasks.components.httpsink;

import com.alibaba.fastjson.JSONObject;
import com.bcxin.event.core.JsonProvider;
import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.tenant.data.etc.table.tasks.utils.JwtUtil;
import com.bcxin.tenant.data.etc.table.tasks.webhookConfigs.WebHookConfigSourceDefinition;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/bcxin/tenant/data/etc/table/tasks/components/httpsink/WebHttpSinkWriter.class */
public class WebHttpSinkWriter implements SinkWriter<String> {
    private static final Logger logger = LoggerFactory.getLogger(WebHttpSinkWriter.class);
    private final List<String> bulkRequests = new ArrayList();
    private transient CloseableHttpClient httpClient;
    private final int batchSize;
    private final int batchIntervalMs;
    private final JsonProvider jsonProvider;
    private final WebHookConfigSourceDefinition configSourceDefinition;

    public WebHttpSinkWriter(int i, int i2, WebHookConfigSourceDefinition webHookConfigSourceDefinition) {
        this.httpClient = null;
        this.configSourceDefinition = webHookConfigSourceDefinition;
        if (i < 10) {
            this.batchSize = 500;
        } else {
            this.batchSize = i;
        }
        if (i2 < 500) {
            this.batchIntervalMs = HttpExecutionOptions.DEFAULT_SIZE;
        } else {
            this.batchIntervalMs = i2;
        }
        this.jsonProvider = new JsonProviderImpl();
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            try {
                flush(true);
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("执行数据推送发生异常", e);
            }
        }, i2, i2, TimeUnit.MICROSECONDS);
        this.httpClient = HttpClients.createDefault();
    }

    public synchronized void write(String str, SinkWriter.Context context) throws IOException, InterruptedException {
        this.bulkRequests.add(str);
        if (this.batchSize <= 0 || this.batchSize > this.bulkRequests.size()) {
        }
        flush(true);
    }

    public synchronized void flush(boolean z) {
        if (CollectionUtils.isEmpty(this.bulkRequests)) {
            return;
        }
        try {
            doExecute(this.configSourceDefinition.getConf(), this.bulkRequests);
            this.bulkRequests.clear();
            logger.error("成功提交完毕并清楚bulkRequests={}", Integer.valueOf(this.bulkRequests.size()));
        } catch (Exception e) {
            throw new BadEventException("执行Http Sink发生异常", e);
        }
    }

    public void close() throws Exception {
        this.httpClient.close();
    }

    private void doExecute(String str, List<String> list) throws IOException {
        String str2 = null;
        try {
            str2 = this.jsonProvider.getJson((Collection) list.stream().map(str3 -> {
                return (JSONObject) this.jsonProvider.toObject(JSONObject.class, str3);
            }).collect(Collectors.toList()));
            HttpPost httpPost = new HttpPost(str);
            StringEntity stringEntity = new StringEntity(str2, "UTF-8");
            stringEntity.setContentEncoding(StandardCharsets.UTF_8.toString());
            stringEntity.setContentType("application/json");
            if (!CollectionUtils.isEmpty(this.configSourceDefinition.getExtendMap())) {
                for (String str4 : this.configSourceDefinition.getExtendMap().keySet()) {
                    String mapValue = this.configSourceDefinition.getMapValue(str4);
                    if (WebHookConfigSourceDefinition.CURRENT_REQUEST_USER_ID.equalsIgnoreCase(str4)) {
                        httpPost.setHeader(WebHookConfigSourceDefinition.REQUEST_HEADER_ACCESS_TOKEN_ORIGINAL, JwtUtil.getToken(mapValue));
                    }
                }
            }
            httpPost.setEntity(stringEntity);
            CloseableHttpResponse execute = this.httpClient.execute(httpPost);
            try {
                StatusLine statusLine = execute.getStatusLine();
                if (statusLine.getStatusCode() != 200) {
                    logger.error("{}: 执行结束(总数量={}):{} 失败-状态:{} 请求-{}; 响应:{}", new Object[]{Long.valueOf(Thread.currentThread().getId()), Integer.valueOf(this.bulkRequests.size()), str, Integer.valueOf(statusLine.getStatusCode()), str2, EntityUtils.toString(execute.getEntity())});
                } else {
                    logger.error("{}: 成功-执行结束(总数量={}):{} Ok-响应:{}", new Object[]{Long.valueOf(Thread.currentThread().getId()), Integer.valueOf(this.bulkRequests.size()), str, Integer.valueOf(statusLine.getStatusCode())});
                }
                if (execute != null) {
                    execute.close();
                }
            } finally {
            }
        } catch (Exception e) {
            logger.error("WebHttpSink调用(body={})发生异常:{}", str2, ExceptionUtils.getMessage(e));
            throw e;
        }
    }

    public List<String> getBulkRequests() {
        return this.bulkRequests;
    }

    public CloseableHttpClient getHttpClient() {
        return this.httpClient;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public int getBatchIntervalMs() {
        return this.batchIntervalMs;
    }

    public JsonProvider getJsonProvider() {
        return this.jsonProvider;
    }

    public WebHookConfigSourceDefinition getConfigSourceDefinition() {
        return this.configSourceDefinition;
    }
}
