目前,我的Kafka Consumer流媒体应用程序手动将偏移量提交到Kafka,enable.auto.commit
设置为false
。当我尝试重新启动它时,应用程序失败,抛出以下异常:
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions:{partition-12=155555555}
假设上述错误是由于保留期间消息不存在/分区删除,我尝试了以下方法:
我禁用了手动提交并启用了自动提交(enable.auto.commit=true
和auto.offset.reset=earliest
)仍然失败并出现相同的错误
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions:{partition-12=155555555}
请建议重新启动作业的方法,以便它可以成功读取消息/分区存在的正确偏移量
您正在尝试从主题155555555
的分区12
中读取偏移量partition
,但是 - 可能 - 由于您的保留策略,它可能已被删除。
您可以使用Kafka Streams Application Reset Tool重置Kafka Streams应用程序的内部状态,以便它可以从头开始重新处理其输入数据
$ bin/kafka-streams-application-reset.sh
Option (* = required) Description
--------------------- -----------
* --application-id <id> The Kafka Streams application ID (application.id)
--bootstrap-servers <urls> Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2
(default: localhost:9092)
--intermediate-topics <list> Comma-separated list of intermediate user topics
--input-topics <list> Comma-separated list of user input topics
--zookeeper <url> Format: HOST:POST
(default: localhost:2181)
或使用新的消费者群组ID启动您的消费者。
我遇到了同样的问题,我在我的应用程序中使用了包org.apache.spark.streaming.kafka010。在开始时,我怀疑auto.offset.reset策略不起作用,但是当我读到方法fixKafkaParams的描述时对象KafkaUtils,我发现配置已被覆盖。我猜它为执行程序调整配置ConsumerConfig.AUTO_OFFSET_RESET_CONFIG的原因是为了保持驱动程序和执行程序获得的一致偏移量。