我正在尝试运行一个 flink 作业,该作业消耗来自/到 kafka 源的数据。为了提交作业,我使用 pyflink 库。
我正在使用: Python 3.8.0 弗林克 1.19.0
该项目的 docker 文件:
version: "3.0"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- uwb_streaming_net
broker:
image: confluentinc/cp-kafka:7.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
expose:
- '29092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
networks:
- uwb_streaming_net
jobmanager:
image: pyflink:latest
ports:
- "8081:8081"
expose:
- "6123"
command: jobmanager # cp opt/flink/conf/config.yaml opt/flink && jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
- ./opt/flink:/flink
- ./kafka_test:/kafka_test
networks:
- uwb_streaming_net
taskmanager:
image: pyflink:latest
depends_on:
- jobmanager
expose:
- "6121"
- "6122"
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
networks:
- uwb_streaming_net
app:
# /flink/bin/flink run -py /taskscripts/app.py --jobmanager jobmanager:8081 --target local
image: flink-app
build:
context: .
dockerfile: ./App.DockerFile
depends_on:
- broker
entrypoint:
- tail
command:
- -f
- /dev/null
volumes:
- ./opt/flink:/opt/flink
- ./kafka_test:/kafka_test
networks:
- uwb_streaming_net
networks:
uwb_streaming_net:
external: false
name: uwb_streaming_net
对于 flink 作业和任务管理器,我必须在 flink 官方映像上使用自定义映像。该图像将 python 添加到这些容器中
FROM flink:latest
# install python3: it has updated Python to 3.9 in Debian 11 and so install Python 3.7 from source
# it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.
RUN apt-get update -y && \
apt-get install -y build-essential libssl-dev liblzma-dev zlib1g-dev libbz2-dev libffi-dev && \
wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
tar -xvf Python-3.8.0.tgz && \
cd Python-3.8.0 && \
./configure --without-tests --enable-shared && \
make -j6 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# install PyFlink
# COPY apache-flink*.tar.gz /
RUN pip3 install --upgrade pip
RUN wget https://files.pythonhosted.org/packages/6e/08/b36307d608aa76f14bfc972a0b1ff15920cdee54414864c9dca4cff5d065/apache-flink-libraries-1.19.0.tar.gz && \
pip3 install apache-flink-libraries*.tar.gz && \
rm -rf apache-flink-libraries*.tar.gz
RUN wget https://files.pythonhosted.org/packages/4f/d7/633fefca40de6522bb704e9daaabecff27c15c521cede6a0450cedba8b9a/apache-flink-1.19.0.tar.gz && \
pip3 install apache-flink*.tar.gz && \
rm -rf apache-flink*.tar.gz
对于分配给
tbl_env.execute_sql(src_ddl)
变量的任何值,使用 src_ddl
命令提交作业的 python 都会失败:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
import os
from pyflink.java_gateway import get_gateway
def main():
jvm = get_gateway().jvm
jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
# Initialize the StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///opt/flink/flink-sql-connector-kafka_2.11-1.13.0.jar")
settings = EnvironmentSettings.new_instance()\
.in_streaming_mode()\
.build()
# create table environment
tbl_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
src_ddl = """
CREATE TABLE incomming (
msg VARCHAR,
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'broker:29092,localhost:9092',
'properties.group.id' = 'mygroup',
'format'= 'json'
);
"""
tbl_env.execute_sql(src_ddl)
tbl=tbl_env.from_path('incomming')
tbl.print_schema()
sql = """
SELECT
msg,
FROM incomming;
"""
message = tbl_env.sql_query(sql)
message.print_schema()
snk_ddl = """
CREATE TABLE outcome (
msg VARCHAR,
) WITH (
'connector' = 'kafka',
'topic' = 'test_from_kafka',
'properties.bootstrap.servers' = 'broker:29092,localhost:9092',
'properties.group.id' = 'mygroup',
'format'= 'json'
);
"""
tbl_env.execute_sql(snk_ddl)
message.execute_insert('outcome').wait()
# Execute the job
env.execute("Kafka Streaming Job")
if __name__ == '__main__':
main()
这就是我在异常日志中得到的:
Traceback (most recent call last):
File "kafka_test/flink.py", line 76, in <module>
main()
File "kafka_test/flink.py", line 42, in main
tbl_env.execute_sql(src_ddl)
File "/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 836, in execute_sql
File "/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1613, in _before_execute
File "/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1548, in _add_jars_to_j_env_config
File "/flink/opt/python/pyflink.zip/pyflink/common/configuration.py", line 83, in parse_jars_value
File "/usr/local/lib/python3.8/site-packages/ruamel/yaml/main.py", line 451, in load
return constructor.get_single_data()
File "/usr/local/lib/python3.8/site-packages/ruamel/yaml/constructor.py", line 112, in get_single_data
node = self.composer.get_single_node()
File "_ruamel_yaml.pyx", line 707, in ruamel.yaml.clib._ruamel_yaml.CParser.get_single_node
File "_ruamel_yaml.pyx", line 904, in ruamel.yaml.clib._ruamel_yaml.CParser._parse_next_event
ruamel.yaml.parser.ParserError: did not find expected <document start>
in "<unicode string>", line 1, column 44
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
... 14 more
Apache 提供的示例工作正常,但没有一个使用
execute_sql()
命令
我尝试更新 ruamel.yaml,测试
src_ddl
的不同值。不知道还能尝试什么。
您所说的 docker 文件看起来不像
Dockerfile
使用的 docker
,更像是驱动 compose.yaml
的 docker-compose
文件。
您粘贴到此处的 YAML 加载没有问题:
import sys
from pathlib import Path
import ruamel.yaml
input = Path('compose.yaml')
for t in ['rt', 'safe']:
yaml = ruamel.yaml.YAML(typ='safe')
data = yaml.load(input)
# yaml.dump(data, sys.stdout)
print('keys:', list(data.keys()))
给出:
keys: ['version', 'services', 'networks']
keys: ['version', 'services', 'networks']
您的工具链看起来正在使用
typ='safe'
),这比默认的 typ='rt'
)受到更多限制,但是您可以尝试两者都应该可以工作。
我建议尝试与上述程序一起使用的实际 YAML 文件,可能存在一些缩进错误,或者它们是在 Windows 上制作的并且包含会混淆解析器的隐藏字符。