/*
 * Decompiled with CFR 0.152.
 */
package com.bcxin.tenant.data.etc.table.tasks;

import com.bcxin.event.core.JsonProvider;
import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.event.core.utils.PropertyUtils;
import com.bcxin.flink.streaming.cores.utils.StorageUtil;
import com.bcxin.tenant.data.etc.table.tasks.components.ExtractTableFieldFromDb;
import com.bcxin.tenant.data.etc.table.tasks.components.ExtractV5MappingIdFromAccountFunction;
import com.bcxin.tenant.data.etc.table.tasks.components.httpsink.CustomHttpPostRequestCallback;
import com.bcxin.tenant.data.etc.table.tasks.components.httpsink.WebHttpSinkWriter;
import com.bcxin.tenant.data.etc.table.tasks.utils.HookConfigParseUtils;
import com.bcxin.tenant.data.etc.table.tasks.utils.JwtUtil;
import com.bcxin.tenant.data.etc.table.tasks.webhookConfigs.WebHookConfigDefinition;
import com.bcxin.tenant.data.etc.table.tasks.webhookConfigs.WebHookConfigSourceDefinition;
import com.bcxin.tenant.data.etc.table.tasks.webhookConfigs.WebHookConfigSourceType;
import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.HttpSink;
import com.getindata.connectors.http.HttpSinkBuilder;
import com.getindata.connectors.http.SchemaLifecycleAwareElementConverter;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

public class EtcTableApp {
    private static final Logger logger = LoggerFactory.getLogger(EtcTableApp.class);

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            throw new BadEventException("etc-table-\u5fc5\u987b\u4f20\u9012\u73af\u5883\u4fe1\u606f\u548c\u8f68\u8ff9\u914d\u7f6e\u6587\u4ef6\u5305\u540d\u79f0(configs/\u73af\u5883/\u5e95\u4e0b\u7684\u6587\u4ef6\u540d)");
        }
        ParameterTool parameterTool = ParameterTool.fromArgs((String[])args);
        String env = parameterTool.get("env");
        String configFile = parameterTool.get("configFile");
        PropertyUtils.loadProperties((String)String.format("configs/%s/db.properties", env));
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.ofMebiBytes((long)512L));
        if (parameterTool != null) {
            List configOptions = Stream.of(JobManagerOptions.PORT, JobManagerOptions.TOTAL_PROCESS_MEMORY, TaskManagerOptions.TOTAL_PROCESS_MEMORY, TaskManagerOptions.NUM_TASK_SLOTS, TaskManagerOptions.TASK_OFF_HEAP_MEMORY, TaskManagerOptions.TASK_HEAP_MEMORY, TaskManagerOptions.JVM_METASPACE, TaskManagerOptions.MANAGED_MEMORY_SIZE, TaskManagerOptions.MANAGED_MEMORY_SIZE, RestOptions.PORT).collect(Collectors.toList());
            for (ConfigOption selectedConfigOption : configOptions) {
                String optionValue = parameterTool.get(selectedConfigOption.key());
                if (!StringUtils.hasLength((String)optionValue)) continue;
                configuration.set(selectedConfigOption, (Object)Integer.parseInt(optionValue));
            }
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend();
        String businessTypePath = "sql-webhook";
        String stateStoragePath = String.format("file:///%s", StorageUtil.getPath((String)String.format("%s-state", businessTypePath)));
        rocksDBStateBackend.setDbStoragePath(stateStoragePath);
        executionEnvironment.setStateBackend((StateBackend)rocksDBStateBackend);
        String checkpointPath = String.format("file:///%s", StorageUtil.getPath((String)businessTypePath));
        executionEnvironment.getCheckpointConfig().setCheckpointStorage(checkpointPath);
        logger.error("WebHook.etc.checkpoint\u7684PointStorage\u4f4d\u7f6e={};", (Object)checkpointPath);
        executionEnvironment.enableCheckpointing(5000L);
        executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000L);
        executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000L);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        executionEnvironment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)executionEnvironment);
        tEnv.createTemporaryFunction("extractV5MappingId", (UserDefinedFunction)new ExtractV5MappingIdFromAccountFunction());
        tEnv.createTemporaryFunction("extractFieldFromDbTable", (UserDefinedFunction)new ExtractTableFieldFromDb(System.getProperties()));
        WebHookConfigDefinition configDefinition = HookConfigParseUtils.parse(env, configFile);
        Collection<String> definitionSql = configDefinition.getHookConfigSourceSql();
        for (String sql : definitionSql) {
            tEnv.executeSql(sql);
        }
        String executeScript = configDefinition.getExecuteDefinition().getScript();
        logger.error("\u5f53\u524d\u7684SQL:{}", (Object)executeScript);
        RichSinkFunction<String> sinkFunction = EtcTableApp.generateRichSinkFunction(configDefinition);
        if (sinkFunction != null) {
            Table table = tEnv.sqlQuery(executeScript);
            JsonProviderImpl jsonProvider = new JsonProviderImpl();
            SingleOutputStreamOperator dataStream = tEnv.toChangelogStream(table).map(arg_0 -> EtcTableApp.lambda$main$9e9e3ace$1((JsonProvider)jsonProvider, arg_0)).filter((FilterFunction & Serializable)ii -> ii != null).uid(String.format("data_stream_map_%s", executeScript.hashCode())).name("\u6355\u83b7\u5546\u57ce\u7684\u8ba2\u5355\u4fe1\u606f");
            dataStream.addSink(sinkFunction).name("\u6267\u884cWebHook\u7684Http Sink");
            executionEnvironment.execute(String.format("WebHook-%s", configDefinition.getName()));
        } else {
            tEnv.executeSql(executeScript).print();
        }
        logger.error("\u5b8c\u6210\u6267\u884c");
    }

    private static HttpSink generateSinkFunction(WebHookConfigDefinition configDefinition) {
        Optional<WebHookConfigSourceDefinition> httpSinkConfigDefinitionOptional = configDefinition.getSources().stream().filter(ii -> ii.getType() == WebHookConfigSourceType.HttpSink).findFirst();
        if (!httpSinkConfigDefinitionOptional.isPresent()) {
            return null;
        }
        final WebHookConfigSourceDefinition hookConfigSourceDefinition = httpSinkConfigDefinitionOptional.get();
        HttpSinkBuilder sinkBuilder = HttpSink.builder().setEndpointUrl(hookConfigSourceDefinition.getConf());
        sinkBuilder = hookConfigSourceDefinition.getMapValue("POST") != null ? sinkBuilder.setElementConverter((SchemaLifecycleAwareElementConverter)new SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry>(){

            public void open(Sink.InitContext context) {
            }

            public HttpSinkRequestEntry apply(String s, SinkWriter.Context context) {
                return new HttpSinkRequestEntry(hookConfigSourceDefinition.getMapValue("POST"), s.getBytes(StandardCharsets.UTF_8));
            }
        }) : sinkBuilder.setElementConverter((SchemaLifecycleAwareElementConverter)new SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry>(){

            public void open(Sink.InitContext context) {
            }

            public HttpSinkRequestEntry apply(String s, SinkWriter.Context context) {
                return new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8));
            }
        });
        sinkBuilder.setHttpPostRequestCallback((HttpPostRequestCallback)new CustomHttpPostRequestCallback());
        sinkBuilder = sinkBuilder.setProperty("gid.connector.http.sink.request.batch.size", "200").setProperty("sink.flush-buffer.timeout", "5000").setProperty("gid.connector.http.sink.writer.request.mode", "batch");
        if (!CollectionUtils.isEmpty(hookConfigSourceDefinition.getExtendMap())) {
            for (String key : hookConfigSourceDefinition.getExtendMap().keySet()) {
                String headValue = hookConfigSourceDefinition.getMapValue(key);
                if ("gid.connector.http.sink.user.id".equalsIgnoreCase(key)) {
                    sinkBuilder = sinkBuilder.setProperty("gid.connector.http.sink.header.accessToken", JwtUtil.getToken(headValue));
                    continue;
                }
                sinkBuilder = sinkBuilder.setProperty(key, headValue);
            }
        }
        sinkBuilder = sinkBuilder.setProperty("gid.connector.http.sink.header.Authorization", String.format("Bearer %s", "xxxxx"));
        HttpSink sink = sinkBuilder.build();
        return sink;
    }

    private static RichSinkFunction<String> generateRichSinkFunction(WebHookConfigDefinition configDefinition) {
        Optional<WebHookConfigSourceDefinition> httpSinkConfigDefinitionOptional = configDefinition.getSources().stream().filter(ii -> ii.getType() == WebHookConfigSourceType.HttpSink).findFirst();
        if (!httpSinkConfigDefinitionOptional.isPresent()) {
            return null;
        }
        final WebHookConfigSourceDefinition webHookConfigSourceDefinition = httpSinkConfigDefinitionOptional.get();
        RichSinkFunction<String> sinkFunction = new RichSinkFunction<String>(){
            private WebHttpSinkWriter webHttpSinkWriter;

            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                this.webHttpSinkWriter = new WebHttpSinkWriter(5000, 10000, webHookConfigSourceDefinition);
            }

            public void close() throws Exception {
                this.webHttpSinkWriter.close();
                super.close();
            }

            public void invoke(String value, SinkFunction.Context context) throws Exception {
                this.webHttpSinkWriter.write(value, null);
            }
        };
        return sinkFunction;
    }

    private static Properties loadProperties(String propertyFile) throws IOException {
        Properties properties = System.getProperties();
        ClassLoader classLoader = PropertyUtils.class.getClassLoader();
        try (InputStream coreStream = classLoader.getResourceAsStream(propertyFile);){
            properties.load(coreStream);
        }
        return properties;
    }

    private static /* synthetic */ String lambda$main$9e9e3ace$1(JsonProvider jsonProvider, Row ii) throws Exception {
        if (ii.getKind() == RowKind.UPDATE_BEFORE) {
            return null;
        }
        Set fieldNames = ii.getFieldNames(true);
        HashMap<String, Object> data = new HashMap<String, Object>();
        for (String fN : fieldNames) {
            Object value = ii.getField(fN);
            if (value == null) continue;
            data.put(fN, ii.getField(fN));
        }
        String json = jsonProvider.getJson(data);
        return json;
    }
}

