Spring-Cloud-Streams Kafka - 如何阻止消费者

问题描述 投票:0回答:1

我有一个Spring-Cloud-Streams客户端,它读取了一个由几个分区组成的Kakfa主题。客户端为其读取的每个Kafka消息调用Web服务。如果几次重试后web服务不可用,我想阻止消费者阅读Kafka。参考之前的Stackoverflow问题(Spring cloud stream kafka pause/resume binders),我自动连接BindingsEndpoint并调用changeState()方法尝试停止使用者,但日志显示消费者在调用changeState()后继续读取来自Kafka的消息。

我使用Spring Boot版本2.1.2.RELEASE与Spring Cloud版本Greenwich.RELEASE。 spring-cloud-stream-binder-kafka的托管版本是2.1.0.RELEASE。我设置了属性autoCommitOffset = true和autoCommitOnError = false。

下面是我的代码片段。有没有我错过的东西?是changeState()的第一个输入参数应该是主题名吗?

如果我希望在Web服务不可用时退出使用者应用程序,我可以简单地执行System.exit()而无需先停止使用者吗?

@Autowired
private BindingsEndpoint bindingsEndpoint;

...
...
@StreamListener(MyInterface.INPUT)  
    public void read(@Payload MyDTO dto,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
        try {

            logger.info("Processing message "+dto);
            process(dto); // this is the method that calls the webservice


        } catch (Exception e) {

            if (e instanceof IllegalStateException || e instanceof ConnectException) {

                bindingsEndpoint.changeState("my.topic.name", 
                    BindingsEndpoint.State.STOPPED);    
                // Binding<?> b = bindingsEndpoint.queryState("my.topic.name"); ==> Using topic name returns a valid Binding object                     
            }

                e.printStackTrace();
                throw (e);
  }
}
spring-boot apache-kafka spring-cloud-stream
1个回答
1
投票

您可以通过使用Binding visualization and control功能来实现,您可以在其中进行可视化以及停止/启动/暂停/恢复绑定。

另外,你知道System.exit()会关闭整个JVM吗?

© www.soinside.com 2019 - 2024. All rights reserved.