如何使用 Flink runner 将 Beam Python 作业提交到 Kubernetes 上?

问题描述 投票:0回答:2

我想在 Kubernetes 内的 Flink 运行器上使用 Beam 运行连续流处理作业。我一直在这里关注本教程(https://python.plainenglish.io/apache-beam-flink-cluster-kubernetes-python-a1965f37b7cb),但我不确定作者在讲话时指的是什么关于“flink主容器”。当代码是在容器映像本身中定义时,我不明白应该如何将 Python 代码提交到集群中。

Kubernetes Flink 集群架构如下:

  • 单个 JobManager,通过 Service 和 Ingress 公开 Flink Web UI

  • 多个任务管理器,每个运行2个容器:

    • Flink 任务管理器
    • Beam工作池,暴露端口50000

示例教程中的 Python 代码具有 Beam 配置,如下所示:

options = PipelineOptions([
    "--runner=FlinkRunner",
    "--flink_version=1.10",
    "--flink_master=localhost:8081",
    "--environment_type=EXTERNAL",
    "--environment_config=localhost:50000"
])

很明显,当您按照教程在本地运行此程序时,它会与 Beam 工作池通信以启动应用程序。但是,如果我有一个包含应用程序代码的 Docker 映像,并且我想在 Kubernetes 中启动此应用程序,那么我应该在 Kubernetes 集群中的何处部署此映像?它是否作为 each Task Manager pod 中的容器(因此使用 localhost:50000 与 Beam 通信)?或者我是否创建一个包含我的应用程序代码的single pod,并将该pod指向我的任务管理器的端口50000 - 如果是这样,那么我有多个任务管理器是一个问题吗?

任何指向文档或示例的指针都会非常有帮助。这个其他问题的答案不完整。

python apache-flink apache-beam
2个回答

0
投票
在flink的Job manager和task manager中使用自定义镜像

Dockerfile

FROM apache/flink:1.16.2-scala_2.12-java11 ARG FLINK_VERSION=1.16.2 ARG KAFKA_VERSION=2.8.0 # Install python3.7 RUN set -ex; \ apt-get update && \ apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev lzma liblzma-dev && \ wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \ tar -xvf Python-3.7.9.tgz && \ cd Python-3.7.9 && \ ./configure --without-tests --enable-shared && \ make -j4 && \ make install && \ ldconfig /usr/local/lib && \ cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \ ln -s /usr/local/bin/python3 /usr/local/bin/python && \ ln -s /usr/local/bin/pip3 /usr/local/bin/pip && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* && \ python -m pip install --upgrade pip; \ pip install apache-flink==${FLINK_VERSION}; \ pip install kafka-python RUN pip install --no-cache-dir apache-beam[gcp] # Copy files from official SDK image, including script/dependencies. COPY --from=apache/beam_python3.7_sdk:2.48.0 /opt/apache/beam/ /opt/apache/beam/

此外,上述部署没有卷来存储临时工件,因此像这样创建 PVC 并根据 storageclassName 调整 PVC

apiVersion: v1 kind: PersistentVolumeClaim metadata: name: staging-artifacts-claim namespace: flink spec: accessModes: - ReadWriteOnce resources: requests: storage: 5Gi storageClassName: standard

使用flink自定义镜像的部署yaml应该是这样的

apiVersion: v1 kind: Service metadata: name: flink-jobmanager namespace: flink spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager --- apiVersion: v1 kind: Service metadata: name: beam-worker-pool namespace: flink spec: selector: app: flink component: taskmanager ports: - protocol: TCP port: 50000 targetPort: 50000 name: pool --- apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager namespace: flink spec: replicas: 1 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: custom-flink:latest imagePullPolicy: IfNotPresent args: ["jobmanager"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf - name: flink-staging mountPath: /tmp/beam-artifact-staging securityContext: runAsUser: 9999 volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - name: flink-staging persistentVolumeClaim: claimName: staging-artifacts-claim --- apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager namespace: flink spec: replicas: 1 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: custom-flink:latest imagePullPolicy: IfNotPresent args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf/ - name: flink-staging mountPath: /tmp/beam-artifact-staging runAsUser: 9999 - name: beam-worker-pool image: apache/beam_python3.11_sdk args: ["--worker_pool"] ports: - containerPort: 50000 name: pool livenessProbe: tcpSocket: port: 50000 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-staging mountPath: /tmp/beam-artifact-staging - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - name: flink-staging persistentVolumeClaim: claimName: staging-artifacts-claim

这是 Flinkpipeline 选项

flink_options = PipelineOptions([ "--runner=FlinkRunner", "--flink_master=flink-jobmanager:8081", "--environment_type=EXTERNAL", "--environment_config=beam-worker-pool:50000", ])

从另一个容器运行 Beam 代码(可能是 cronjob/job 或部署),并使用 python sdk 运行 Beam 代码,使用 python 代码创建一个映像,并确保使用

apache/beam_python3.7_sdk:2.48.0

 并在其中安装 java,以便扩展服务将启动,否则它会启动会使用docker。

请根据您的需要调整sdk版本

© www.soinside.com 2019 - 2024. All rights reserved.