我有一个Spring-Cloud-Streams客户端,它读取了一个由几个分区组成的Kakfa主题。客户端为其读取的每个Kafka消息调用Web服务。如果几次重试后web服务不可用,我想阻止消费者阅读Kafka。参考之前的Stackoverflow问题(Spring cloud stream kafka pause/resume binders),我自动连接BindingsEndpoint并调用changeState()方法尝试停止使用者,但日志显示消费者在调用changeState()后继续读取来自Kafka的消息。
我使用Spring Boot版本2.1.2.RELEASE与Spring Cloud版本Greenwich.RELEASE。 spring-cloud-stream-binder-kafka的托管版本是2.1.0.RELEASE。我设置了属性autoCommitOffset = true和autoCommitOnError = false。
下面是我的代码片段。有没有我错过的东西?是changeState()的第一个输入参数应该是主题名吗?
如果我希望在Web服务不可用时退出使用者应用程序,我可以简单地执行System.exit()而无需先停止使用者吗?
@Autowired
private BindingsEndpoint bindingsEndpoint;
...
...
@StreamListener(MyInterface.INPUT)
public void read(@Payload MyDTO dto,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
try {
logger.info("Processing message "+dto);
process(dto); // this is the method that calls the webservice
} catch (Exception e) {
if (e instanceof IllegalStateException || e instanceof ConnectException) {
bindingsEndpoint.changeState("my.topic.name",
BindingsEndpoint.State.STOPPED);
// Binding<?> b = bindingsEndpoint.queryState("my.topic.name"); ==> Using topic name returns a valid Binding object
}
e.printStackTrace();
throw (e);
}
}
您可以通过使用Binding visualization and control功能来实现,您可以在其中进行可视化以及停止/启动/暂停/恢复绑定。
另外,你知道System.exit()
会关闭整个JVM吗?