使用连接器在 kubernetes 上部署 pyflink(kafka/kinesis)

问题描述 投票:0回答:1

我正在尝试找到一种使用 k8s 运算符在 k8s 上部署 pyflink 的方法。我已经能够使用 k8s Operator 上传作业,但我找不到如何向其添加连接器(例如 kafka-connector.jar 或 kinesis-connector.jar)。我找不到更多有关如何将 pyflink 与 k8s 运算符一起使用的文档,而且我对 java 不熟悉。所以这对我来说是一个死胡同

我基于这个存储库,其中指出使用 FlinKDeployment.yaml 部署一个演示 pyflink,该演示 pyflink 下沉到控制台(假设有一个 k8s 集群,并且操作员已经在运行以将其应用到)。我已经遵循了并且它有效。但现在我只是想弄清楚如何在其上添加源/接收器连接器.jar。

我遵循了关于

如何在 python 中使用连接器的文档,该文档在本地运行良好。但由于某种原因不在 k8s 运营商上。

我遵循的步骤是将连接器 flink-sql-connector-kinesis-1.16.2.jar 添加到我的 Dockerfile 映像中,并在 python 文件(也包含在映像中)上添加了引用它的代码,这在本地运行良好:

env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(stream_execution_environment=env) CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) t_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis-1.16.2.jar", )
然后我使用 kinesis 创建一个水槽表

t_env.execute_sql( """ CREATE TABLE print_table (<columns...>) WITH ( 'connector' = 'kinesis', 'stream' = '<stream_name>', 'aws.region' = '<aws_region>', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
但是,当我基于该存储库创建 FlinkDeployment 时,我发现它找不到 Kinesis,可能是因为这不是将connector.jar 包含在使用 k8s 运算符提交的 pyflink 作业中的方法,我认为该作业使用 java 而不是 python ?? 。我得到的java错误是

Could not find any factory for identifier 'kinesis' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.


我提交工作的方式是使用基于该存储库的清单,并且图像中的 python 文件确实可以在

/opt/flink/usrlib/python_demo.py

 :
找到

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-kinesis-smoke spec: image: <docker_hub_repo>/pyflink_kinesis:latest flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.1.jar # Note, this jarURI is actually a placeholder entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/python_demo.py"] parallelism: 1 upgradeMode: stateless
我已经阅读了所有 

Flink K8s Operator 文档,但我找不到任何提及 pyflink 的内容,仅提交已完全打包在 .jar 文件中的作业,这不是我的用例,因为我使用的是 pyflink。我还发现了这个关于使用 FlinkCluster 类型的清单的其他存储库,但我无法使其工作,因为 k8s 集群表示 FlinkCluster 类型不存在。

有人知道如何在应用程序或会话模式下使用 k8s 操作员上的连接器部署 pyflink 吗?我相信我唯一的选择是

使用 CLI 提交,我希望避免使用 k8s 运算符(如果可能的话)

apache-flink flink-streaming pyflink
1个回答
0
投票
您可以在 Docker 映像中烘焙 jar,然后使用以下方法将它们拉入正在运行的脚本中:

env.add_jars("file:///path/to/flink-sql-connector-kafka-1.16.2.jar")
详细信息请参见:

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/dependency_management/

© www.soinside.com 2019 - 2024. All rights reserved.