业务目标 1. 百保盾系统原始业务系统增量数据根据业务要求自动增量式归集推送给目标系统 2. 归集中间过程数据需要留存 3. 归集时效性高且保证数据有效性(可以采用 实时+离线兜底的解决方案) 4. 完善的归集同步日志 参考资源: https://juejin.cn/post/6969874734355841031 环境: K8S 高可用的归集环境 启动方式 安装minikube minikube delete minikube start --memory=7g --disk-size=30g --registry-mirror=https://registry.docker-cn.com docker pull zookeeper:latest minikube image load zookeeper:latest docker pull apache/kafka:latest minikube image load apache/kafka:latest docker pull starrocks/fe-ubuntu minikube image load starrocks/fe-ubuntu docker pull confluentinc/cp-kafka:7.3.0 minikube image load confluentinc/cp-kafka:7.3.0 kubectl port-forward svc/kafka-service 9092:9092 部署StarRocks的步骤 https://zhuanlan.zhihu.com/p/647059827 安装 StarRocks K8s Operator kubectl apply -f starRocks/operator.yaml 安装 starrockscluster CRD kubectl apply -f starRocks/starrocks.com_starrocksclusters.yaml 部署 StarRocks kubectl apply -f starRocks/starrocks-fe-and-be.yaml 从Kubernetes集群内部链接 kubectl get svc -n starrocks [图片] 从Kubernetes集群外部连接 kubectl -n starrocks patch --type=merge src starrockscluster-sample -p '{"spec":{"starRocksFeSpec":{"service":{"type": "LoadBalancer"}}}}' 我们可以使用如下命令查看连接的IP信息 kubectl get svc -n starrocks 查看当前命名空间 kubectl config get-contexts 切换当前的命名空间 kubectl config set-context --current --namespace=starrocks 暴露外部服务 kubectl --namespace=starrocks kubectl port-forward svc/starrockscluster-sample-fe-search 9030:9030 [k8s flink] https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/deployment/kubernetes/ https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/ kubectl create -f k8s-operator/cert-manager.yaml helm.exe repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/ helm.exe install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --set webhook.create=false [demo] kubectl apply -f k8s-operator/demos/basic.yaml 查看日志 kubectl logs -f deploy/basic-example https://cloud.tencent.com/developer/article/2345731 [启动flink session] kubectl apply -f flink-session-cluster.yaml [kubectl get deployments 禁止不动,用以下查看原因] kubectl describe deployment flink-session-clsuter-example [调整可用内存] C:\Users\hanch\.wslconfig [wsl2] memory=11GB # 设置WSL2的最大内存使用量为8GB processors=4 # 设置WSL2使用的CPU核心数量为4 swap=11GB # 设置WSL2的虚拟内存大小为8GB [查看minikube的可用内存信息] minikube start --disk-size=30g --registry-mirror=https://registry.docker-cn.com --force minikube start --disk-size=50g --registry-mirror=https://registry.docker-cn.com minikube ssh free -m minikube start --disk-size=30g --registry-mirror=https://registry.docker-cn.com --force helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/ helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --set webhook.create=false [JDK17可用] ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster -Drest.bind-address=0.0.0.0 kubectl delete deployment/my-first-flink-cluster kubectl port-forward svc/my-first-flink-cluster-rest 8081:8081 --address 0.0.0.0 vi ~/.bashrc source ~/.bashrc ifconfig | grep eth0 -n1 | grep inet | awk '{print $3}' iptables -t nat -A PREROUTING -p tcp --dport $EXTERNAL_PORT -j DNAT --to-destination $INTERNAL_IP:$INTERNAL_PORT iptables -t nat -A PREROUTING -p tcp --dport 8082 -j DNAT --to-destination 127.0.0.1:8081 iptables -t nat -A POSTROUTING -j MASQUERAD kubectl create namespace flink-cluster kubectl create serviceaccount flink -n flink-cluster kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink-cluster:flink ./bin/kubernetes-session.sh -Dkubernetes.namespace=flink-cluster -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=my-session -Dtaskmanager.memory.process.size=4096m -Dkubernetes.taskmanager.cpu=1 -Dtaskmanager.numberOfTaskSlots=4 -Dresourcemanager.taskmanager-timeout=3600000 kubectl get pods -n flink-cluster kubectl get svc -n flink-cluster kubectl port-forward svc/my-session-rest 8081:8081 -n flink-cluster --address 0.0.0.0 kubectl config set-context --current --namespace=flink-cluster kubectl delete deployment/my-session -n flink-cluster [查看所有资源] ./bin/flink list -Dkubernetes.namespace=flink-cluster --target kubernetes-application -Dkubernetes.cluster-id=my-session https://developer.aliyun.com/article/913111 minikube start --disk-size=30g --registry-mirror=https://registry.docker-cn.com --force kubectl config get-contexts 【k8s】 kubectl delete deployment/my-session -n flink-cluster ./bin/kubernetes-session.sh -Dkubernetes.namespace=flink-cluster -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=my-session -Dtaskmanager.memory.process.size=4096m -Dkubernetes.taskmanager.cpu=1 -Dtaskmanager.numberOfTaskSlots=4 -Dresourcemanager.taskmanager-timeout=3600000 ./bin/flink run --target kubernetes-session -Dkubernetes.namespace=flink-cluster -Dkubernetes.cluster-id=my-session ./examples/streaming/TopSpeedWindowing.jar kubectl get pods -n flink-cluster ./bin/flink run-application --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster -Dkubernetes.container.image.ref=custom-image-name \ local:///opt/flink/usrlib/my-flink-job.jar 执行的过程中报如下出错误 UnknownHostException: my-session-rest.flink-cluster: Name or service not known 解决方案 在/etc/hosts中添加 127.0.0.1 my-session-rest.flink-cluster cd /mnt/e/software/flink/flink-1.20.0-bin-scala_2.12/flink-1.20.0 kubectl get pods -n flink-cluster ./bin/sql-client.sh CREATE TABLE source_tenant_users ( id STRING , name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '49.4.21.141', 'port' = '45611', 'username' = 'bcx', 'password' = 'Wstestv5qy#2022', 'database-name' = 'obpm2', 'table-name' = 'tenant_users' ); CREATE TABLE sink_tenant_users ( id STRING PRIMARY KEY NOT ENFORCED, name STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://49.4.21.141:45611/obpm2?characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false&allowPublicKeyRetrieval=true&zeroDateTimeBehavior=CONVERT_TO_NULL', 'table-name' = 'tenant_users', 'username' = 'bcx', 'password' = 'Wstestv5qy#2022' ); INSERT INTO sink_tenant_users SELECT * FROM source_tenant_users; CREATE TABLE sink_tenant_users ( id STRING PRIMARY KEY NOT ENFORCED, name STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://172.21.24.221:3306/obpm2?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&zeroDateTimeBehavior=CONVERT_TO_NULL', 'table-name' = 'tenant_users', 'username' = 'root', 'password' = '123456' ); CREATE TABLE source_tenant_users_local ( id STRING , name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '172.21.24.221', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'obpm2', 'table-name' = 'tenant_users', 'server-time-zone'='UTC' ); INSERT INTO sink_tenant_users SELECT * FROM source_tenant_users; [ApacheStreamPack: E:\software\streampack] docker pull apache/streampark 更改docker-compose.yaml 里面的版本为最新 docker-compose up -d SET 'scan.startup.modex'='latest-offset'; SET 'execution.checkpointing.interval' = '3s';