当我尝试通过偏移时间戳从 kafka 主题检索数据时,作业失败并出现错误:
原因:java.lang.AssertionError:断言失败:没有与主题分区 topicA-0 和时间戳 1686877634000 的请求匹配的偏移量。
问题是主题中实际上有数据,但只在partition-1中,但作业仍然失败,因为partition-0中没有数据。
我使用的代码:
df_stream = (
spark.read.format("kafka")
.option("kafka.bootstrap.servers", kafka_servers)
.option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("subscribe", kafka_topic)
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.group.id", kafka_group_id)
.option("startingOffsetsByTimestamp", """{"topicA":{"0": 1686877634000, "1": 1686877634000}}""")
.option("endingOffsetsByTimestamp", """{"topicA":{"0": 1686881234000, "1": 1686881234000}""")
.option("failOnDataLoss", false)
.options(**options)
.load()
)
有办法避免这个错误吗?我需要检索特定时间戳的数据。我尝试读取主题中的所有数据并按 kafka 时间戳进行过滤,它有效,但我认为这不是一个好的解决方案。
您应该将分区 0 的起始偏移量设置为另一个值。 如果您查看
checkpointing
目录(在 SparkSession 创建期间设置),并检查 offsets
目录,您应该能够设置分区的值来启动流。
添加选项“startingOffsetsByTimestampStrategy”->“最新” https://www.mail-archive.com/[电子邮件受保护]/msg311812.html