如何使用spring-cloud-stream实现Kafka使用者按需处理事件?

问题描述 投票:1回答:2

我正在尝试实现一个PollableConsumer,它在某种情况下开始轮询来自Kafka的消息,在这种情况下,当我在SpringBoot应用程序中点击一个端点时。

我尝试了在一定条件下触发轮询器的多种方法,但显然它只有在从kafka主题不断轮询时才有效。 (就像spring-cloud-stream docs中的所有示例一样)

我正在寻找这样的东西:

public interface CustomProcessor {
    @Input
    PollableMessageSource input();
}
 public void run() {
        boolean result = true;
        while (result) {
            result = input.poll(m -> {
                Event event = (Event) m.getPayload();
                GenericMessage<Event> genericMessage = new GenericMessage<>(event, m.getHeaders());
                eventMessageConsumer.consume(genericMessage);
            }, new ParameterizedTypeReference<Event>() {
            });

            try {
                Thread.sleep(1_000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
            if (result) {
                System.out.println("Success");
            }
        }
    }

当我点击这样的端点时,可能会触发这个:

@GetMapping("/process")
public void process() {
   SomeClass.run();
}
spring-cloud-stream
2个回答
0
投票

显然暂时不可能使用spring-cloud-stream暂停PollableConsumer,所以我回到基于事件的消息消耗并使用执行器来控制绑定的状态。在this线程和spring-cloud-stream docs之后,我注入了BindingsEndpoint并改变了绑定的状态,如下所示:

@RestController
public class EventController {
    @Autowired
    public EventController(BindingsEndpoint bindingsEndpoint) {
        this.bindingsEndpoint = bindingsEndpoint;
    }
    @GetMapping("/changeState")
    public void sendMessage(@RequestParam("state") String state) {
        if (state.equals("paused")) {
            bindingsEndpoint.changeState("MY_BINDING", 
                           BindingsEndpoint.State.PAUSED);
        }
        if (state.equals("resumed")) {
            bindingsEndpoint.changeState("MY_BINDING", 
                           BindingsEndpoint.State.RESUMED);
        }
    }

这不是我想要实现的,但它足够接近。


-1
投票

在你的代码示例中,我不确定input.poll返回什么,因为你将结果分配给boolean,而这不是KafkaConsumer.poll方法返回的内容。

卡夫卡消费者可以按需暂停和重新启动,这应该不是问题。

来自最新的KafkaConsumer javadoc:

消费流量控制

...

Kafka支持动态控制消耗流,方法是使用pause(Collection)和resume(Collection)暂停指定分配的分区上的消耗,并在将来的poll(Duration)调用中分别恢复指定的暂停分区上的消耗。

然后从pause方法描述:

请注意,此方法不会影响分区订阅。特别是,在使用自动分配时,它不会导致组重新平衡。

https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

因此,在您的情况下,您可以简单地使用volatile boolean isPaused标志,如果更改了,您可以根据需要发出pauseresume命令。 KafkaConsumer将完成剩下的工作。

另请注意,KafkaConsumer不是线程安全的,因此您应该在轮询数据的同一线程上发出这些命令。

© www.soinside.com 2019 - 2024. All rights reserved.