package com.bcxin.tenant.data.etc.table.tasks;

import com.bcxin.event.core.JsonProvider;
import com.bcxin.event.core.JsonProviderImpl;
import com.bcxin.event.core.exceptions.BadEventException;
import com.bcxin.event.core.utils.PropertyUtils;
import com.bcxin.flink.streaming.cores.utils.StorageUtil;
import com.bcxin.tenant.data.etc.table.tasks.components.ExtractTableFieldFromDb;
import com.bcxin.tenant.data.etc.table.tasks.components.ExtractV5MappingIdFromAccountFunction;
import com.bcxin.tenant.data.etc.table.tasks.components.httpsink.CustomHttpPostRequestCallback;
import com.bcxin.tenant.data.etc.table.tasks.components.httpsink.HttpExecutionOptions;
import com.bcxin.tenant.data.etc.table.tasks.components.httpsink.WebHttpSinkWriter;
import com.bcxin.tenant.data.etc.table.tasks.utils.HookConfigParseUtils;
import com.bcxin.tenant.data.etc.table.tasks.utils.JwtUtil;
import com.bcxin.tenant.data.etc.table.tasks.webhookConfigs.WebHookConfigDefinition;
import com.bcxin.tenant.data.etc.table.tasks.webhookConfigs.WebHookConfigSourceDefinition;
import com.bcxin.tenant.data.etc.table.tasks.webhookConfigs.WebHookConfigSourceType;
import com.getindata.connectors.http.HttpSink;
import com.getindata.connectors.http.HttpSinkBuilder;
import com.getindata.connectors.http.SchemaLifecycleAwareElementConverter;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/bcxin/tenant/data/etc/table/tasks/EtcTableApp.class */
public class EtcTableApp {
    private static final Logger logger = LoggerFactory.getLogger(EtcTableApp.class);

    public static void main(String[] strArr) throws Exception {
        if (strArr.length < 2) {
            throw new BadEventException("etc-table-必须传递环境信息和轨迹配置文件包名称(configs/环境/底下的文件名)");
        }
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        String str = fromArgs.get("env");
        String str2 = fromArgs.get("configFile");
        PropertyUtils.loadProperties(String.format("configs/%s/db.properties", str));
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(512L));
        if (fromArgs != null) {
            for (ConfigOption configOption : (List) Stream.of((Object[]) new ConfigOption[]{JobManagerOptions.PORT, JobManagerOptions.TOTAL_PROCESS_MEMORY, TaskManagerOptions.TOTAL_PROCESS_MEMORY, TaskManagerOptions.NUM_TASK_SLOTS, TaskManagerOptions.TASK_OFF_HEAP_MEMORY, TaskManagerOptions.TASK_HEAP_MEMORY, TaskManagerOptions.JVM_METASPACE, TaskManagerOptions.MANAGED_MEMORY_SIZE, TaskManagerOptions.MANAGED_MEMORY_SIZE, RestOptions.PORT}).collect(Collectors.toList())) {
                String str3 = fromArgs.get(configOption.key());
                if (StringUtils.hasLength(str3)) {
                    configuration.set(configOption, Integer.valueOf(Integer.parseInt(str3)));
                }
            }
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();
        embeddedRocksDBStateBackend.setDbStoragePath(String.format("file:///%s", StorageUtil.getPath(String.format("%s-state", "sql-webhook"))));
        executionEnvironment.setStateBackend(embeddedRocksDBStateBackend);
        String format = String.format("file:///%s", StorageUtil.getPath("sql-webhook"));
        executionEnvironment.getCheckpointConfig().setCheckpointStorage(format);
        logger.error("WebHook.etc.checkpoint的PointStorage位置={};", format);
        executionEnvironment.enableCheckpointing(5000L);
        executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000L);
        executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000L);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        executionEnvironment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        create.createTemporaryFunction("extractV5MappingId", new ExtractV5MappingIdFromAccountFunction());
        create.createTemporaryFunction("extractFieldFromDbTable", new ExtractTableFieldFromDb(System.getProperties()));
        WebHookConfigDefinition parse = HookConfigParseUtils.parse(str, str2);
        Iterator<String> it = parse.getHookConfigSourceSql().iterator();
        while (it.hasNext()) {
            create.executeSql(it.next());
        }
        String script = parse.getExecuteDefinition().getScript();
        logger.error("当前的SQL:{}", script);
        RichSinkFunction<String> generateRichSinkFunction = generateRichSinkFunction(parse);
        if (generateRichSinkFunction != null) {
            Table sqlQuery = create.sqlQuery(script);
            JsonProviderImpl jsonProviderImpl = new JsonProviderImpl();
            create.toChangelogStream(sqlQuery).map(row -> {
                if (row.getKind() == RowKind.UPDATE_BEFORE) {
                    return null;
                }
                Set<String> fieldNames = row.getFieldNames(true);
                HashMap hashMap = new HashMap();
                for (String str4 : fieldNames) {
                    if (row.getField(str4) != null) {
                        hashMap.put(str4, row.getField(str4));
                    }
                }
                return jsonProviderImpl.getJson(hashMap);
            }).filter(str4 -> {
                return str4 != null;
            }).uid(String.format("data_stream_map_%s", Integer.valueOf(script.hashCode()))).name("捕获商城的订单信息").addSink(generateRichSinkFunction).name("执行WebHook的Http Sink");
            executionEnvironment.execute("执行商城的WebHook的推送服务");
        } else {
            create.executeSql(script).print();
        }
        logger.error("完成执行");
    }

    private static HttpSink generateSinkFunction(WebHookConfigDefinition webHookConfigDefinition) {
        Optional<WebHookConfigSourceDefinition> findFirst = webHookConfigDefinition.getSources().stream().filter(webHookConfigSourceDefinition -> {
            return webHookConfigSourceDefinition.getType() == WebHookConfigSourceType.HttpSink;
        }).findFirst();
        if (!findFirst.isPresent()) {
            return null;
        }
        final WebHookConfigSourceDefinition webHookConfigSourceDefinition2 = findFirst.get();
        HttpSinkBuilder endpointUrl = HttpSink.builder().setEndpointUrl(webHookConfigSourceDefinition2.getConf());
        HttpSinkBuilder elementConverter = webHookConfigSourceDefinition2.getMapValue(WebHookConfigSourceDefinition.INSERT_METHOD) != null ? endpointUrl.setElementConverter(new SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry>() { // from class: com.bcxin.tenant.data.etc.table.tasks.EtcTableApp.1
            public void open(Sink.InitContext initContext) {
            }

            public HttpSinkRequestEntry apply(String str, SinkWriter.Context context) {
                return new HttpSinkRequestEntry(WebHookConfigSourceDefinition.this.getMapValue(WebHookConfigSourceDefinition.INSERT_METHOD), str.getBytes(StandardCharsets.UTF_8));
            }
        }) : endpointUrl.setElementConverter(new SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry>() { // from class: com.bcxin.tenant.data.etc.table.tasks.EtcTableApp.2
            public void open(Sink.InitContext initContext) {
            }

            public HttpSinkRequestEntry apply(String str, SinkWriter.Context context) {
                return new HttpSinkRequestEntry(WebHookConfigSourceDefinition.INSERT_METHOD, str.getBytes(StandardCharsets.UTF_8));
            }
        });
        elementConverter.setHttpPostRequestCallback(new CustomHttpPostRequestCallback());
        HttpSinkBuilder property = elementConverter.setProperty("gid.connector.http.sink.request.batch.size", "200").setProperty("sink.flush-buffer.timeout", "5000").setProperty("gid.connector.http.sink.writer.request.mode", "batch");
        if (!CollectionUtils.isEmpty(webHookConfigSourceDefinition2.getExtendMap())) {
            for (String str : webHookConfigSourceDefinition2.getExtendMap().keySet()) {
                String mapValue = webHookConfigSourceDefinition2.getMapValue(str);
                property = WebHookConfigSourceDefinition.CURRENT_REQUEST_USER_ID.equalsIgnoreCase(str) ? property.setProperty(WebHookConfigSourceDefinition.REQUEST_HEADER_ACCESS_TOKEN, JwtUtil.getToken(mapValue)) : property.setProperty(str, mapValue);
            }
        }
        return property.setProperty(WebHookConfigSourceDefinition.REQUEST_HEADER_AUTHORIZATION, String.format("Bearer %s", "xxxxx")).build();
    }

    private static RichSinkFunction<String> generateRichSinkFunction(WebHookConfigDefinition webHookConfigDefinition) {
        Optional<WebHookConfigSourceDefinition> findFirst = webHookConfigDefinition.getSources().stream().filter(webHookConfigSourceDefinition -> {
            return webHookConfigSourceDefinition.getType() == WebHookConfigSourceType.HttpSink;
        }).findFirst();
        if (!findFirst.isPresent()) {
            return null;
        }
        final WebHookConfigSourceDefinition webHookConfigSourceDefinition2 = findFirst.get();
        return new RichSinkFunction<String>() { // from class: com.bcxin.tenant.data.etc.table.tasks.EtcTableApp.3
            private WebHttpSinkWriter webHttpSinkWriter;

            public void open(Configuration configuration) throws Exception {
                super.open(configuration);
                this.webHttpSinkWriter = new WebHttpSinkWriter(HttpExecutionOptions.DEFAULT_SIZE, 10000, WebHookConfigSourceDefinition.this);
            }

            public void close() throws Exception {
                this.webHttpSinkWriter.close();
                super.close();
            }

            public void invoke(String str, SinkFunction.Context context) throws Exception {
                this.webHttpSinkWriter.write(str, (SinkWriter.Context) null);
            }
        };
    }

    private static Properties loadProperties(String str) throws IOException {
        Properties properties = System.getProperties();
        InputStream resourceAsStream = PropertyUtils.class.getClassLoader().getResourceAsStream(str);
        try {
            properties.load(resourceAsStream);
            if (resourceAsStream != null) {
                resourceAsStream.close();
            }
            return properties;
        } catch (Throwable th) {
            if (resourceAsStream != null) {
                try {
                    resourceAsStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 205683750:
                if (implMethodName.equals("lambda$main$6d6c4f31$1")) {
                    z = false;
                    break;
                }
                break;
            case 485393027:
                if (implMethodName.equals("lambda$main$9e9e3ace$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("filter") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/bcxin/tenant/data/etc/table/tasks/EtcTableApp") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Z")) {
                    return str4 -> {
                        return str4 != null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/bcxin/tenant/data/etc/table/tasks/EtcTableApp") && serializedLambda.getImplMethodSignature().equals("(Lcom/bcxin/event/core/JsonProvider;Lorg/apache/flink/types/Row;)Ljava/lang/String;")) {
                    JsonProvider jsonProvider = (JsonProvider) serializedLambda.getCapturedArg(0);
                    return row -> {
                        if (row.getKind() == RowKind.UPDATE_BEFORE) {
                            return null;
                        }
                        Set<String> fieldNames = row.getFieldNames(true);
                        HashMap hashMap = new HashMap();
                        for (String str42 : fieldNames) {
                            if (row.getField(str42) != null) {
                                hashMap.put(str42, row.getField(str42));
                            }
                        }
                        return jsonProvider.getJson(hashMap);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
