我们有一个消费者组和三个主题,这三个主题都是不同的模式。创建了一个使用for循环的消费者,一次传递每个主题并轮询处理并手动提交。使用的方法是消费者创建的常用方法,在for循环中,我一次订阅一个主题并处理数据。我看到消费者的随机延迟,虽然主题有数据我的消费者从主题获取没有记录和有时提取。当我使用单个主题而不是循环遍历三个主题时,它正在工作但无法重现。需要帮助来调试问题并重现相同的,
您可以创建一个类似于任何主题的骨架线程,而不是在一个方法中循环三个主题。 See examples here
我不知道这是否会“修复”这个问题,但是尝试在一个应用程序中使用不同模式的主题消费通常不是一种可扩展的模式,但是你真正想要做的并不是很清楚。
class ConsumerThread extends Thread {
KafkaConsumer consumer;
AtomicBoolean stopped = new AtomicBoolean();
ConsumerThread(Properties props, String subscribePattern) {
this.consumer = new KafkaConsumer...
this.consumer.subscribe(subscribePattern);
}
@Override
public void run() {
while (!this.stopped.get()) {
... records = this.consumer.poll(100);
for ( ... each record ... ) {
// Process record
}
}
}
public void stop() {
this.stopped.set(true);
}
}
不是生产级的
然后独立运行三个消费者。
new ConsumerThread("t1").start();
new ConsumerThread("t2").start();
new ConsumerThread("t3").start();
注意:KafkaConsumer
不是线程安全的。