我目前在Dataproc上运行spark作业,尝试重新加入一个群组并从kafka主题读取数据时遇到错误。我做了一些挖掘,不确定是什么问题。我将auto.offset.reset
设置为earliest
,因此应该从最早可用的未提交偏移量中进行读取,并且最初我的火花日志如下所示:
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-11 to offset 5553330.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-2 to offset 5555553.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-3 to offset 5555484.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-4 to offset 5555586.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-5 to offset 5555502.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-6 to offset 5555561.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-7 to offset 5555542.```
但是在接下来的一行中,我尝试从服务器上不存在的偏移量读取时出现错误(您可以看到分区的偏移量与上面列出的偏移量不同,因此我不知道为什么要尝试这样做。读取偏移量的形式,这是下一行的错误:
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
out of range with no configured reset policy for partitions:
{demo.topic-11=4544296}
关于为什么我的火花作业会不断回到此偏移量(4544296),而不是它最初输出的偏移量(5553330)的任何想法?
它似乎与a矛盾,它与a)表示其打开的实际偏移量和它试图读取的偏移量有关,b)表示没有配置的重置策略。>
我目前在Dataproc上运行spark作业,尝试重新加入一个群组并从kafka主题读取数据时遇到错误。我做了一些挖掘,不确定是什么问题。我有自动...。
一年到晚的答案,但希望能帮助其他面临类似问题的人。