我有一个目前在下面使用Spring Cloud Streams和RabbitMQ的项目。我已经实现了逻辑based on the documentation。见下文:
@Component
public class ReRouteDlq {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
它完成了预期的工作,但是它绑定到RabbitMQ,并且我的公司计划在一两年内停止使用此消息代理(不知道为什么,这一定是一件疯狂的事情)。因此,我想实现同一件事,但将其与任何消息代理分离。
我尝试用这种方式更改rePublish
方法,但是它不起作用:
@StreamListener(Sync.DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
[失败,因为Message
类具有不可变的标头-在put
企图说您无法更改其值时抛出异常(使用org.springframework.messaging.Message
类)。
有没有一种以消息代理独立的方式实现此死信队列处理程序的方法?
恕我直言,检测失败消息并将其发送到应用程序代码中的死信队列的主要问题是,这种方法远非防弹。
如果您收到一条消息,该消息将使应用程序节点消耗consumption尽,该怎么办?这样的消息将永远不会到达您的死信队列逻辑,并继续使应用程序节点崩溃,直到从队列中手动删除该消息为止。
[在发布消息的dead letter queue policy之后,在服务器上配置negative acknowledgement并让RabbitMQ将消息移至死信队列会更安全。如果还配置message ttl,则应该能够处理使节点崩溃的消息。
使用
MessageBuilder.fromMessage(message)
.setHeader("foo", "bar")
...
.build();