在SparkStreaming中暂停和恢复KafkaConsumer的工作。

问题描述 投票:0回答:1

:)

我已经结束了自己在一个(奇怪的)情况下,短暂的,我不想消费任何来自卡夫卡的新记录,所以。暂停 火花流的消耗量 (InputDStream[ConsumerRecord])对题目中的所有分区,进行一些操作,最后。简历 消耗记录。

首先... 可不可以

我一直在尝试这样的事情。

var consumer: KafkaConsumer[String, String] = _    
consumer = new KafkaConsumer[String, String](properties)    
consumer.subscribe(java.util.Arrays.asList(topicName))

consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())

但我得到了这个。

println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]

欢迎大家帮助我了解我遗漏了什么,以及为什么我得到的结果是空的,而很明显,消费者已经分配了分区。

版本卡夫卡 0.10Spark: 2.3.0Scala: 2.11.8

apache-kafka spark-streaming
1个回答
0
投票

是的,可以在你的代码中添加检查点,并通过持久性存储(本地磁盘,S3,HDFS)路径。

每当你开始恢复你的工作时,它将从检查指向中获取Kafka消费者组信息和消费者偏移量,并从它停止的地方开始处理。

val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
© www.soinside.com 2019 - 2024. All rights reserved.