ruamel.yaml 在 Docker 上使用 Pyflink 时出现解析器错误

问题描述 投票:0回答:1

我正在尝试运行一个 flink 作业,该作业消耗来自/到 kafka 源的数据。为了提交作业,我使用 pyflink 库。

我正在使用: Python 3.8.0 弗林克 1.19.0

该项目的 docker 文件:

version: "3.0"

services:

  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - uwb_streaming_net

  broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    expose:
      - '29092'
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
    networks:
      - uwb_streaming_net

  jobmanager:
    image: pyflink:latest
    ports:
      - "8081:8081"
    expose:
      - "6123"
    command:  jobmanager # cp opt/flink/conf/config.yaml opt/flink && jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    volumes:
      - ./opt/flink:/flink
      - ./kafka_test:/kafka_test
    networks:
      - uwb_streaming_net

  taskmanager:
    image: pyflink:latest
    depends_on:
      - jobmanager
    expose:
      - "6121"
      - "6122"
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    networks:
      - uwb_streaming_net

  app:
    # /flink/bin/flink run -py /taskscripts/app.py --jobmanager jobmanager:8081 --target local
    image: flink-app
    build: 
      context: .
      dockerfile: ./App.DockerFile
    depends_on:
      - broker
    entrypoint:
      - tail
    command:
      - -f
      - /dev/null
    volumes:
      - ./opt/flink:/opt/flink
      - ./kafka_test:/kafka_test
    networks:
      - uwb_streaming_net

networks:
  uwb_streaming_net:
    external: false
    name: uwb_streaming_net

对于 flink 作业和任务管理器,我必须在 flink 官方映像上使用自定义映像。该图像将 python 添加到这些容器中

FROM flink:latest

# install python3: it has updated Python to 3.9 in Debian 11 and so install Python 3.7 from source
# it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.

RUN apt-get update -y && \
apt-get install -y build-essential libssl-dev liblzma-dev zlib1g-dev libbz2-dev libffi-dev && \
wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
tar -xvf Python-3.8.0.tgz && \
cd Python-3.8.0 && \
./configure --without-tests --enable-shared && \
make -j6 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# install PyFlink

# COPY apache-flink*.tar.gz /
RUN pip3 install --upgrade pip

RUN wget https://files.pythonhosted.org/packages/6e/08/b36307d608aa76f14bfc972a0b1ff15920cdee54414864c9dca4cff5d065/apache-flink-libraries-1.19.0.tar.gz && \
pip3 install apache-flink-libraries*.tar.gz && \
rm -rf apache-flink-libraries*.tar.gz

RUN wget https://files.pythonhosted.org/packages/4f/d7/633fefca40de6522bb704e9daaabecff27c15c521cede6a0450cedba8b9a/apache-flink-1.19.0.tar.gz && \
pip3 install apache-flink*.tar.gz && \
rm -rf apache-flink*.tar.gz

对于分配给

tbl_env.execute_sql(src_ddl)
变量的任何值,使用
src_ddl
命令提交作业的 python 都会失败:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
import os
from pyflink.java_gateway import get_gateway


def main():

  jvm = get_gateway().jvm
  jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
  classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()

  # Initialize the StreamExecutionEnvironment
  env = StreamExecutionEnvironment.get_execution_environment()
  env.add_jars("file:///opt/flink/flink-sql-connector-kafka_2.11-1.13.0.jar")

  settings = EnvironmentSettings.new_instance()\
                                .in_streaming_mode()\
                                .build()

  # create table environment
  tbl_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
  
  src_ddl = """
  CREATE TABLE incomming (
    msg VARCHAR,
  ) WITH (
    'connector' = 'kafka',
    'topic' = 'test',
    'properties.bootstrap.servers' = 'broker:29092,localhost:9092',
    'properties.group.id' = 'mygroup',
    'format'= 'json'
  );
  """
  tbl_env.execute_sql(src_ddl)
  
  tbl=tbl_env.from_path('incomming')
  tbl.print_schema()
  
  sql = """
  SELECT
    msg,
  FROM incomming;
  """
  
  message = tbl_env.sql_query(sql)
  message.print_schema()
  
  snk_ddl = """
  CREATE TABLE outcome (
    msg VARCHAR,
  ) WITH (
    'connector' = 'kafka',
    'topic' = 'test_from_kafka',
    'properties.bootstrap.servers' = 'broker:29092,localhost:9092',
    'properties.group.id' = 'mygroup',
    'format'= 'json'
  );
  """
  
  tbl_env.execute_sql(snk_ddl)
  
  message.execute_insert('outcome').wait()
  
  # Execute the job
  env.execute("Kafka Streaming Job")

if __name__ == '__main__':
    main()

这就是我在异常日志中得到的:

Traceback (most recent call last):
  File "kafka_test/flink.py", line 76, in <module>
    main()
  File "kafka_test/flink.py", line 42, in main
    tbl_env.execute_sql(src_ddl)
  File "/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 836, in execute_sql
  File "/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1613, in _before_execute
  File "/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1548, in _add_jars_to_j_env_config
  File "/flink/opt/python/pyflink.zip/pyflink/common/configuration.py", line 83, in parse_jars_value
  File "/usr/local/lib/python3.8/site-packages/ruamel/yaml/main.py", line 451, in load
    return constructor.get_single_data()
  File "/usr/local/lib/python3.8/site-packages/ruamel/yaml/constructor.py", line 112, in get_single_data
    node = self.composer.get_single_node()
  File "_ruamel_yaml.pyx", line 707, in ruamel.yaml.clib._ruamel_yaml.CParser.get_single_node
  File "_ruamel_yaml.pyx", line 904, in ruamel.yaml.clib._ruamel_yaml.CParser._parse_next_event
ruamel.yaml.parser.ParserError: did not find expected <document start>
  in "<unicode string>", line 1, column 44
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 14 more

Apache 提供的示例工作正常,但没有一个使用

execute_sql()
命令

我尝试更新 ruamel.yaml,测试

src_ddl
的不同值。不知道还能尝试什么。

python docker apache-kafka ruamel.yaml pyflink
1个回答
0
投票

您所说的 docker 文件看起来不像

Dockerfile
使用的
docker
,更像是驱动
compose.yaml
docker-compose
文件。

您粘贴到此处的 YAML 加载没有问题:

import sys
from pathlib import Path
import ruamel.yaml

input = Path('compose.yaml')

for t in ['rt', 'safe']:
    yaml = ruamel.yaml.YAML(typ='safe')
    data = yaml.load(input)
    # yaml.dump(data, sys.stdout)
    print('keys:', list(data.keys()))

给出:

keys: ['version', 'services', 'networks']
keys: ['version', 'services', 'networks']

您的工具链看起来正在使用

typ='safe'
),这比默认的
typ='rt'
)受到更多限制,但是您可以尝试两者都应该可以工作。

我建议尝试与上述程序一起使用的实际 YAML 文件,可能存在一些缩进错误,或者它们是在 Windows 上制作的并且包含会混淆解析器的隐藏字符。

© www.soinside.com 2019 - 2024. All rights reserved.