我正在尝试实现一个PollableConsumer,它在某种情况下开始轮询来自Kafka的消息,在这种情况下,当我在SpringBoot应用程序中点击一个端点时。
我尝试了在一定条件下触发轮询器的多种方法,但显然它只有在从kafka主题不断轮询时才有效。 (就像spring-cloud-stream docs中的所有示例一样)
我正在寻找这样的东西:
public interface CustomProcessor {
@Input
PollableMessageSource input();
}
public void run() {
boolean result = true;
while (result) {
result = input.poll(m -> {
Event event = (Event) m.getPayload();
GenericMessage<Event> genericMessage = new GenericMessage<>(event, m.getHeaders());
eventMessageConsumer.consume(genericMessage);
}, new ParameterizedTypeReference<Event>() {
});
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
if (result) {
System.out.println("Success");
}
}
}
当我点击这样的端点时,可能会触发这个:
@GetMapping("/process")
public void process() {
SomeClass.run();
}
显然暂时不可能使用spring-cloud-stream暂停PollableConsumer,所以我回到基于事件的消息消耗并使用执行器来控制绑定的状态。在this线程和spring-cloud-stream docs之后,我注入了BindingsEndpoint并改变了绑定的状态,如下所示:
@RestController
public class EventController {
@Autowired
public EventController(BindingsEndpoint bindingsEndpoint) {
this.bindingsEndpoint = bindingsEndpoint;
}
@GetMapping("/changeState")
public void sendMessage(@RequestParam("state") String state) {
if (state.equals("paused")) {
bindingsEndpoint.changeState("MY_BINDING",
BindingsEndpoint.State.PAUSED);
}
if (state.equals("resumed")) {
bindingsEndpoint.changeState("MY_BINDING",
BindingsEndpoint.State.RESUMED);
}
}
这不是我想要实现的,但它足够接近。
在你的代码示例中,我不确定input.poll
返回什么,因为你将结果分配给boolean
,而这不是KafkaConsumer.poll
方法返回的内容。
卡夫卡消费者可以按需暂停和重新启动,这应该不是问题。
来自最新的KafkaConsumer
javadoc:
消费流量控制
...
Kafka支持动态控制消耗流,方法是使用pause(Collection)和resume(Collection)暂停指定分配的分区上的消耗,并在将来的poll(Duration)调用中分别恢复指定的暂停分区上的消耗。
然后从pause
方法描述:
请注意,此方法不会影响分区订阅。特别是,在使用自动分配时,它不会导致组重新平衡。
https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
因此,在您的情况下,您可以简单地使用volatile boolean isPaused
标志,如果更改了,您可以根据需要发出pause
或resume
命令。 KafkaConsumer将完成剩下的工作。
另请注意,KafkaConsumer不是线程安全的,因此您应该在轮询数据的同一线程上发出这些命令。