apache flink作业使用pyflink执行错误

问题描述 投票:0回答:1
`start reading data from kafka
Using Any for unsupported type: typing.Sequence[~T]
Traceback (most recent call last):
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\examples\python\datastream\main_sample.py", line 104, in <module>
    read_from_kafka()
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\examples\python\datastream\main_sample.py", line 83, in read_from_kafka
    env.execute("Kafka Streaming Job")
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\pyflink.zip\pyflink\datastream\stream_execution_environment.py", line 813, in execute
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1322, in __call__
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\pyflink.zip\pyflink\util\exceptions.py", line 146, in deco
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\py4j-0.10.9.7-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o9.execute.
: java.net.MalformedURLException: no protocol: ['file:/C:/flink-1.19.0-bin-scala_2.12/flink-1.19.0/opt/flink-python-1.19.0.jar']

        at java.base/java.net.URL.<init>(URL.java:645)

        at java.base/java.net.URL.<init>(URL.java:541)

        at java.base/java.net.URL.<init>(URL.java:488)

        at org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133)

        at org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77)

        at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77)

        at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)

        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)

        at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)

        at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117)

        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.base/java.lang.reflect.Method.invoke(Method.java:566)

        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)

        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

        at java.base/java.lang.Thread.run(Thread.java:834)


org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 14 more
`

我正在尝试使用 pyflink 在 flink 中执行 kafka 消费者作业。我的代码使用 python 包 pyflink 编写。

如何在窗口中运行flink仪表板。我完成了所有配置设置。字数统计简单的 python 应用程序作业已成功执行。 但是这个kafka只消耗python代码并没有在flink仪表板中执行。

请帮忙解决。

apache-flink pyflink
1个回答
0
投票

简短回答:您所需要的只是适合您的 Flink 版本的 Kafka 连接器。

你从哪里得到它? -> 简单,在 Maven Central Repo 中 (https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka)

但是等等。您正在使用 Flink 1.19 和 kafka 连接器,该版本截至 2024 年 3 月 23 日尚未发布。 (https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/

因此,您需要使用源代码来构建它。 而且,你这样做吗?幸运的是,依赖项已经可用。

现在您可以使用以下步骤:

  1. 确保你有 Java 11 和 Maven(我有 Maven 3.9.6)
  2. 查看存储库 (https://github.com/apache/flink-connector-kafka)
  3. 为 flink 1.19 创建 kafka 连接器 JAR(请参阅下面的代码片段)
  4. 里面会生成JAR:flink-connector-kafka/target/flink-connector-kafka-3.1-SNAPSHOT.jar
git clone https://github.com/apache/flink-connector-kafka.git
cd flink-connector-kafka
mvn clean package -Dflink.version=1.19.0 -DskipTests

现在如何处理这个 JAR?

  1. 确保 apache-flink==1.19.0
    pip install apache-flink==1.19.0
  2. 在pyflink中创建表/流环境
from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_streaming_mode()
s_env = TableEnvironment.create(env_settings)

# FLINK 1.19.0
DEPS_DIR = "/Users/sourav/Documents/workspace/entropy-backend/infra-stream/lib-flink-1.19"
s_env.get_config().set(
    "pipeline.jars",
    f"file:///{DEPS_DIR}/flink-connector-kafka-3.1-SNAPSHOT.jar;file:///{DEPS_DIR}/kafka-clients-3.7.0.jar",
)
s_env.get_config().set(
    "pipeline.classpaths",
    f"file:///{DEPS_DIR}/flink-connector-kafka-3.1-SNAPSHOT.jar;file:///{DEPS_DIR}/kafka-clients-3.7.0.jar",
)

s_env.get_config().get_configuration().to_dict()

测试是否有效

s_env.execute_sql('DROP TABLE IF EXISTS t1')

s_env.execute_sql("""
    CREATE TABLE t1 (
      symbol STRING,
      price FLOAT
      ltt timestamp(3),
      WATERMARK FOR ltt as ltt - INTERVAL '1' SECOND
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test',
      'properties.bootstrap.servers' = 'kafka-host:port',
      'properties.group.id' = 'MY_TEST_GROUP',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'
    )
""")
© www.soinside.com 2019 - 2024. All rights reserved.