我试图找出处理在聚合的组超时发生后调用的服务中可能发生的错误的最佳方法,该服务模仿相同的流,就像满足releaseExpression一样。
这是我的设置:
我有一个AmqpInboundChannelAdapter,它接收消息并将它们发送到我的聚合器。
当满足releaseExpression并且在groupTimeout到期之前,如果在我的ServiceActivator中抛出异常,则消息将被发送到我的死信队列中以获取该MessageGroup中的所有消息。 (以下示例中的10条消息,仅用于说明目的)这就是我所期望的。
如果我的releaseExpression没有得到满足但是groupTimeout已经满足并且组超时,如果在我的ServiceActivator中抛出异常,则消息不会被发送到我的死信队列并被激活。
在阅读了另一篇博文后,link1提到这是因为处理发生在MessageGroupStoreReaper的另一个线程中,而不是SimpleMessageListenerContainer所在的线程。一旦处理离开SimpleMessageListener的线程,消息将是auto ack。
我添加了上面链接中提到的配置,并查看发送到我的错误处理程序的错误消息。我的主要问题是,被认为是处理此方案以最大限度地减少消息丢失的最佳方法。
以下是我正在探索的选项:
理想情况下,我希望有一个流程,当满足releaseExpression时,它会模仿相同的流程,这样消息就不会丢失。
有没有人有关于处理他们过去使用过的场景的最佳方法的建议?
感谢您的帮助和/或建议!
这是我使用Spring Integration DSL的当前配置
@Bean
public SimpleMessageListenerContainer workListenerContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(rabbitConnectionFactory);
container.setQueues(worksQueue());
container.setConcurrentConsumers(4);
container.setDefaultRequeueRejected(false);
container.setTransactionManager(transactionManager);
container.setChannelTransacted(true);
container.setTxSize(10);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
return container;
}
@Bean
public AmqpInboundChannelAdapter inboundRabbitMessages() {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(workListenerContainer());
return adapter;
}
我已经定义了一个错误通道,并定义了我自己的taskScheduler用于MessageStoreRepear
@Bean
public ThreadPoolTaskScheduler taskScheduler(){
ThreadPoolTaskScheduler ts = new ThreadPoolTaskScheduler();
MessagePublishingErrorHandler mpe = new MessagePublishingErrorHandler();
mpe.setDefaultErrorChannel(myErrorChannel());
ts.setErrorHandler(mpe);
return ts;
}
@Bean
public PollableChannel myErrorChannel() {
return new QueueChannel();
}
public IntegrationFlow aggregationFlow() {
return IntegrationFlows.from(inboundRabbitMessages())
.transform(Transformers.fromJson(SomeObject.class))
.aggregate(a->{
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.releaseExpression("size() == 10");
a.transactional(true);
}
)
.handle("someService", "processMessages")
.get();
}
这是我的自定义错误流程
@Bean
public IntegrationFlow errorResponse() {
return IntegrationFlows.from("myErrorChannel")
.<MessagingException, Message<?>>transform(MessagingException::getFailedMessage,
e -> e.poller(p -> p.fixedDelay(100)))
.channel("myErrorChannelHandler")
.handle("myErrorHandler","handleFailedMessage")
.log()
.get();
}
这是自定义错误处理程序
@Component
public class MyErrorHandler {
@Autowired
BatchingRabbitTemplate batchingRabbitTemplate;
@ServiceActivator(inputChannel = "myErrorChannelHandler")
public void handleFailedMessage(Message<?> message) {
ArrayList<SomeObject> payload = (ArrayList<SomeObject>)message.getPayload();
payload.forEach(m->batchingRabbitTemplate.convertAndSend("some.dlq","#", m));
}
}
这是BatchingRabbitTemplate bean
@Bean
public BatchingRabbitTemplate batchingRabbitTemplate() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(5);
scheduler.initialize();
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, Integer.MAX_VALUE, 30000);
BatchingRabbitTemplate batchingRabbitTemplate = new BatchingRabbitTemplate(batchingStrategy, scheduler);
batchingRabbitTemplate.setConnectionFactory(rabbitConnectionFactory);
return batchingRabbitTemplate;
}
更新1)以显示自定义消息组处理器:
public class CustomAggregtingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
@Override
protected final Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
return group;
}
}
示例服务:
@Slf4j
public class SomeService {
@ServiceActivator
public void processMessages(MessageGroup messageGroup) throws IOException {
Collection<Message<?>> messages = messageGroup.getMessages();
//Do business logic
//ack messages in the group
for (Message<?> m : messages) {
com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel)
m.getHeaders().get("amqp_channel");
long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");
log.debug(" deliveryTag = {}",deliveryTag);
log.debug("Channel = {}",channel);
channel.basicAck(deliveryTag, false);
}
}
}
更新了integrationFlow
public IntegrationFlow aggregationFlowWithCustomMessageProcessor() {
return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
.aggregate(a -> {
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.releaseExpression("size() == 10");
a.transactional(true);
a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
}).handle("someService", "processMessages").get();
}
新的ErrorHandler做nack
public class MyErrorHandler {
@ServiceActivator(inputChannel = "myErrorChannelHandler")
public void handleFailedMessage(MessageGroup messageGroup) throws IOException {
if(messageGroup!=null) {
log.debug("Nack messages size = {}", messageGroup.getMessages().size());
Collection<Message<?>> messages = messageGroup.getMessages();
for (Message<?> m : messages) {
com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel)
m.getHeaders().get("amqp_channel");
long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");
log.debug("deliveryTag = {}",deliveryTag);
log.debug("channel = {}",channel);
channel.basicNack(deliveryTag, false, false);
}
}
}
}
更新2添加了自定义发布策略并更改为聚合器
public class CustomMeasureGroupReleaseStratgedy implements ReleaseStrategy {
private static final int MAX_MESSAGE_COUNT = 10;
public boolean canRelease(MessageGroup messageGroup) {
return messageGroup.getMessages().size() >= MAX_MESSAGE_COUNT;
}
}
public IntegrationFlow aggregationFlowWithCustomMessageProcessorAndReleaseStratgedy() {
return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
.aggregate(a -> {
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.transactional(true);
a.releaseStrategy(new CustomMeasureGroupReleaseStratgedy());
a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
}).handle("someService", "processMessages").get();
}
您的理解存在一些缺陷。如果您使用AUTO,则只有最后一条消息在发生异常时才会被删除。在发布之前成功存入组中的邮件将立即被确认。
实现您想要的唯一方法是使用MANUAL acks。
没有办法“告诉侦听器容器向DLQ发送消息”。容器从不向DLQ发送消息,它拒绝消息并且代理将其发送到DLX / DLQ。