当一个kafka主题分区中没有数据时,结构化流作业失败

问题描述 投票:0回答:2

当我尝试通过偏移时间戳从 kafka 主题检索数据时,作业失败并出现错误:

原因:java.lang.AssertionError:断言失败:没有与主题分区 topicA-0 和时间戳 1686877634000 的请求匹配的偏移量。

问题是主题中实际上有数据,但只在partition-1中,但作业仍然失败,因为partition-0中没有数据。

我使用的代码:

        df_stream = (
            spark.read.format("kafka")
            .option("kafka.bootstrap.servers", kafka_servers)
            .option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            .option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            .option("subscribe", kafka_topic)
            .option("kafka.security.protocol", "SASL_PLAINTEXT")
            .option("kafka.group.id", kafka_group_id)
            .option("startingOffsetsByTimestamp", """{"topicA":{"0": 1686877634000, "1": 1686877634000}}""")
            .option("endingOffsetsByTimestamp", """{"topicA":{"0": 1686881234000, "1": 1686881234000}""")
            .option("failOnDataLoss", false) 
            .options(**options)
            .load()
        )

有办法避免这个错误吗?我需要检索特定时间戳的数据。我尝试读取主题中的所有数据并按 kafka 时间戳进行过滤,它有效,但我认为这不是一个好的解决方案。

apache-spark apache-kafka spark-structured-streaming
2个回答
0
投票

您应该将分区 0 的起始偏移量设置为另一个值。 如果您查看

checkpointing
目录(在 SparkSession 创建期间设置),并检查
offsets
目录,您应该能够设置分区的值来启动流。


0
投票

添加选项“startingOffsetsByTimestampStrategy”->“最新” https://www.mail-archive.com/[电子邮件受保护]/msg311812.html

© www.soinside.com 2019 - 2024. All rights reserved.