在PyFlink作业中使用KafkaOffsetsInitializer.comfilled_offsets遇到Py4JError(版本1.17.2)

问题描述 投票:0回答:1

我正在使用 PyFlink 开发 Apache Flink(版本 1.17.2)的流应用程序,并且在配置 Kafka 源以从上次提交的偏移量恢复时遇到了问题。此外,我还在我的环境设置中包含了自定义 JAR 文件。在执行我的工作时,我遇到了一个与初始化 Kafka 偏移量特别相关的 Py4JError。

这是我看到的错误:

Traceback (most recent call last):
  File "/usr/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
...
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.committedOffsets. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method committedOffsets([class org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy]) does not exist


我正在尝试使用 KafkaOffsetsInitializer.comfilled_offsets() 来设置我的 Kafka 源的起始偏移量。除此之外,我还将以下 JAR 文件添加到我的 Flink 环境中以支持 Kafka 连接:

kafka-clients-3.6.0.jar
flink-connector-kafka-1.17.2.jar
flink-sql-connector-kafka-1.17.2.jar

这是我的代码片段,包括添加 JAR 文件的位置:

from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer

env = StreamExecutionEnvironment.get_execution_environment()

# Adding JAR files
env.add_jars(
    "file:///opt/flink/lib/kafka-clients-3.6.0.jar",
    "file:///opt/flink/lib/flink-connector-kafka-1.17.2.jar",
    "file:///opt/flink/lib/flink-sql-connector-kafka-1.17.2.jar"
)

kafka_source = KafkaSource.builder() \
    .set_bootstrap_servers("kafka-server:9092") \
    .set_topics("my-topic") \
    .set_group_id("my-consumer-group") \
    .set_starting_offsets(KafkaOffsetsInitializer.committed_offsets()) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .build()

env.add_source(kafka_source)

我担心的是:

Why is the Py4JError occurring, particularly in relation to KafkaOffsetsInitializer.committed_offsets()?
Could the inclusion of these JAR files be affecting how offsets are initialized or how the method is recognized?
Is there something I'm missing in the setup that's causing this method to not be recognized, given my PyFlink and Kafka connector versions?

任何有关解决此错误的建议或见解将不胜感激。我的目标是确保我的 Flink 作业可以从崩溃后停止的位置恢复处理,而不会重新处理或丢失数据。

尝试调查为什么会出现此问题,但没有找到任何文档指出我的 jar 文件之间不匹配

apache-kafka apache-flink pyflink
1个回答
0
投票

您只需要 flink-sql-connector-kafka 并且应该删除其他的。仅使用 flink-connector-kafka 时我遇到了同样的错误。一旦替换为flink-sql-connector-kafka,错误就不再存在。我使用的是最新版本3.0.2和flink 1.18.1。

© www.soinside.com 2019 - 2024. All rights reserved.