我有一条使用Spring Boot和RabbitMQ处理任务的服务管道。
每个服务消耗队列中的消息,进行一些工作,然后重新发送,作为要由下一个服务处理的消息。
在此过程的每个步骤中,我正在向中央队列发送another消息,并对该任务进行状态更新(将其保留在数据库中)。
我已经根据this article实现了错误处理。简而言之:如果出错,消息将被拒绝给具有TTL的DLQ,然后重新排队进入主队列。
在错误处理程序中,我区分致命(业务)异常和可重试异常。
我是这样实现的:
@RabbitListener(queues = "task.step.first", errorHandler = "customErrorHandler")
@SendTo("task.status")
public Status process(Task task) {
// Check if retries < max
// Do some heavy work. Create nextTask
rabbitTemplate.convertAndSend("task.step.second", nextTask);
return new Status("Step 1 Success!");
}
@Component
public class CustomErrorHandler implements RabbitListenerErrorHandler {
@Override
public Object handleError(Message amqpMessage,
org.springframework.messaging.Message<?> message,
ListenerExecutionFailedException exception) throws Exception {
// Check if error is fatal or retryable
if (exception.getCause() /* ..is fatal? */) {
return new Status("FAIL!");
}
// Unknown exception, rethrow it and let message to be NACKed and retried via DLQ
// How to still send 'new Status("Retrying...")' here while NACKing the message?
throw exception;
}
}
如您所见,我真的不知道如何处理最坏的情况。
我想(重新)抛出错误处理程序中的原始异常,导致消息被NACK处理,路由到DLQ并重试,但是我also想要发回中间消息状态消息作为答复。
当然,我可以直接将RabbitTemplate
插入CustomErrorHandler
并将状态消息手动发送到@SendTo
队列,但是我想知道是否有更好的方法直接利用回复通道而不必手动注入另一个RabbitTemplate。
有趣的用例。
如果使用MANUAL
ack模式,则可以在侦听器中执行此操作...
public Status process(Task task, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTAG) {
...
if (good) {
channel.basicAck(deliveryTag, false);
return status;
}
else {
throw someException
}
然后,在错误处理程序中,channel.basicReject()
,而不是引发异常。
通道与DELIVERY_TAG一起在Message
AmqpHeaders.CHANNEL
标头中可用。
通道已添加到2.2.0和2.1.7中的标题中。在早期版本中不可用。