Spring cloud stream + spring retry,如何添加恢复回调并禁用发送到DLQ的逻辑?

问题描述 投票:0回答:1

我正在使用春季云流+兔子mq绑定器

在我的[[@StreaListener]中]我想使用RetryTemplate在特定的异常上应用重试逻辑。重试用尽或抛出不可重试的错误后,我想添加一个恢复回调,该回调将一条带有错误消息的新记录保存到我的Postgres DB中,并完成该消息(移至下一条)。这是我到目前为止所得到的: @StreamListener(Sink.INPUT) public void saveUser(User user) { User user = userService.saveUser(user); //could throw exceptions log.info(">>>>>>User is created successfully: {}", user); } @StreamRetryTemplate public RetryTemplate myRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy()); Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>(); retryableExceptions.put(ConnectionException.class, true); retryTemplate.registerListener(new RetryListener() { @Override public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) { return true; } @Override public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { //could add recovery logic here, like save error to db why sertain user was not saved log.info("retries exausted"); } @Override public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { log.error("Error on retry", throwable); } }); retryTemplate.setRetryPolicy( new SimpleRetryPolicy(properties.getRetriesCount(), retryableExceptions, true)); return retryTemplate; }

从属性,我只有这些(没有任何dlq配置)

spring.cloud.stream.bindings.input.destination = user-topic spring.cloud.stream.bindings.input.group = user-consumer

重试用尽后,我得到此日志。

2020-06-01 20:05:58.674 INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:56722] 2020-06-01 20:05:58.685 INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#319c51b0:0/SimpleConnection@2a060201 [delegate=amqp://[email protected]:56722/, localPort= 50728] 2020-06-01 20:05:58.697 INFO 18524 --- [idge-consumer-1] c.e.i.o.b.c.RetryConfiguration : retry finish 2020-06-01 20:05:58.702 ERROR 18524 --- [127.0.0.1:56722] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DLX' in vhost '/', class-id=60, method-id=40)

在触发RetryListener close方法之后,我可以看到侦听器尝试连接到DLX可能发布了错误消息。而且我不希望它这样做并且每次都在日志中观察到此错误消息。

所以我的问题是:

1)在哪里为我的retryTemplate添加RecoveryCalback?大概我可以在RetryListener#close方法中将保存错误的恢复逻辑写入db,但是肯定应该有更合适的方法来做到这一点。

2)如何配置Rabbit-MQ活页夹不向DLQ发送消息,也许我可以重写某些方法?当前,重试用尽(或出现不可重试的错误)之后,侦听器将尝试向DLX发送消息并记录找不到该错误的错误。我不需要将任何消息发送到应用程序范围内的dlq,只需要将其保存到DB。

我正在使用Spring Cloud Stream + Rabbit MQ活页夹。在我的@StreaListener中,我想使用RetryTemplate在特定异常上应用重试逻辑。重试用尽或无法重试后,错误是...

spring-cloud spring-cloud-stream spring-rabbitmq spring-retry
1个回答
0
投票
当前没有提供自定义恢复回调的机制。
© www.soinside.com 2019 - 2024. All rights reserved.