我想在 Kubernetes 内的 Flink 运行器上使用 Beam 运行连续流处理作业。我一直在这里关注本教程(https://python.plainenglish.io/apache-beam-flink-cluster-kubernetes-python-a1965f37b7cb),但我不确定作者在讲话时指的是什么关于“flink主容器”。当代码是在容器映像本身中定义时,我不明白应该如何将 Python 代码提交到集群中。
Kubernetes Flink 集群架构如下:
单个 JobManager,通过 Service 和 Ingress 公开 Flink Web UI
多个任务管理器,每个运行2个容器:
示例教程中的 Python 代码具有 Beam 配置,如下所示:
options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_version=1.10",
"--flink_master=localhost:8081",
"--environment_type=EXTERNAL",
"--environment_config=localhost:50000"
])
很明显,当您按照教程在本地运行此程序时,它会与 Beam 工作池通信以启动应用程序。但是,如果我有一个包含应用程序代码的 Docker 映像,并且我想在 Kubernetes 中启动此应用程序,那么我应该在 Kubernetes 集群中的何处部署此映像?它是否作为 each Task Manager pod 中的容器(因此使用 localhost:50000 与 Beam 通信)?或者我是否创建一个包含我的应用程序代码的single pod,并将该pod指向我的任务管理器的端口50000 - 如果是这样,那么我有多个任务管理器是一个问题吗?
任何指向文档或示例的指针都会非常有帮助。这个其他问题的答案不完整。
Flink Kubernetes Operator确实提供了一个Beam示例,可以解决您面临的大部分工具问题。它是为 Java 编写的,但通过将 Python 源代码添加到 Docker 映像中,您应该能够实现您正在寻找的目标。
Dockerfile
FROM apache/flink:1.16.2-scala_2.12-java11
ARG FLINK_VERSION=1.16.2
ARG KAFKA_VERSION=2.8.0
# Install python3.7
RUN set -ex; \
apt-get update && \
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev lzma liblzma-dev && \
wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
tar -xvf Python-3.7.9.tgz && \
cd Python-3.7.9 && \
./configure --without-tests --enable-shared && \
make -j4 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
ln -s /usr/local/bin/pip3 /usr/local/bin/pip && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* && \
python -m pip install --upgrade pip; \
pip install apache-flink==${FLINK_VERSION}; \
pip install kafka-python
RUN pip install --no-cache-dir apache-beam[gcp]
# Copy files from official SDK image, including script/dependencies.
COPY --from=apache/beam_python3.7_sdk:2.48.0 /opt/apache/beam/ /opt/apache/beam/
此外,上述部署没有卷来存储临时工件,因此像这样创建 PVC 并根据 storageclassName 调整 PVC
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: staging-artifacts-claim
namespace: flink
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
storageClassName: standard
使用flink自定义镜像的部署yaml应该是这样的
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
namespace: flink
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
name: beam-worker-pool
namespace: flink
spec:
selector:
app: flink
component: taskmanager
ports:
- protocol: TCP
port: 50000
targetPort: 50000
name: pool
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: flink
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: custom-flink:latest
imagePullPolicy: IfNotPresent
args: ["jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: flink-staging
mountPath: /tmp/beam-artifact-staging
securityContext:
runAsUser: 9999
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-staging
persistentVolumeClaim:
claimName: staging-artifacts-claim
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: flink
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: custom-flink:latest
imagePullPolicy: IfNotPresent
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: flink-staging
mountPath: /tmp/beam-artifact-staging
runAsUser: 9999
- name: beam-worker-pool
image: apache/beam_python3.11_sdk
args: ["--worker_pool"]
ports:
- containerPort: 50000
name: pool
livenessProbe:
tcpSocket:
port: 50000
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-staging
mountPath: /tmp/beam-artifact-staging
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-staging
persistentVolumeClaim:
claimName: staging-artifacts-claim
这是 Flinkpipeline 选项
flink_options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_master=flink-jobmanager:8081",
"--environment_type=EXTERNAL",
"--environment_config=beam-worker-pool:50000",
])
从另一个容器运行 Beam 代码(可能是 cronjob/job 或部署),并使用 python sdk 运行 Beam 代码,使用 python 代码创建一个映像,并确保使用
apache/beam_python3.7_sdk:2.48.0
并在其中安装 java,以便扩展服务将启动,否则它会启动会使用docker。
请根据您的需要调整sdk版本