有什么不同? KafkaConsumer和KafkaListener一词可以互换使用吗?
@KafkaListener
是ConcurrentMessageListenerContainer
的高级API,它在KafkaConsumer
周围产生了几个内部听众。
不同之处在于,当您需要时调用KafkaConsumer
时,poll()
API可根据需要进行调查。监听器抽象即将围绕poll()
进行无限循环,并且只要它们出现在poll()
上,就会生成记录消息。我们有一个任务执行器,它运行如下逻辑:
while (isRunning()) {
try {
pollAndInvoke();
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
// Ignore, we're stopping
}
catch (NoOffsetForPartitionException nofpe) {
this.fatalError = true;
ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
break;
}
catch (Exception e) {
handleConsumerException(e);
}
catch (Error e) { // NOSONAR - rethrown
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
if (runnable != null) {
runnable.run();
}
this.logger.error("Stopping container due to an Error", e);
wrapUp();
throw e;
}
}
KafkaConsumer.poll()
被称为pollAndInvoke();
。