:)
我已经结束了自己在一个(奇怪的)情况下,短暂的,我不想消费任何来自卡夫卡的新记录,所以。暂停 火花流的消耗量 (InputDStream[ConsumerRecord])对题目中的所有分区,进行一些操作,最后。简历 消耗记录。
首先... 可不可以
我一直在尝试这样的事情。
var consumer: KafkaConsumer[String, String] = _
consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(java.util.Arrays.asList(topicName))
consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())
但我得到了这个。
println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]
欢迎大家帮助我了解我遗漏了什么,以及为什么我得到的结果是空的,而很明显,消费者已经分配了分区。
版本卡夫卡 0.10Spark: 2.3.0Scala: 2.11.8
是的,可以在你的代码中添加检查点,并通过持久性存储(本地磁盘,S3,HDFS)路径。
每当你开始恢复你的工作时,它将从检查指向中获取Kafka消费者组信息和消费者偏移量,并从它停止的地方开始处理。
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)