我有这个集成流程
IntegrationFlows.from(MessageChannels.queue(MailPushGateway.SEND_MAIL_NOTIFICATION_CHANNEL, Integer.MAX_VALUE))
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.headerExpression(TracingConstants.USER_ID, "payload.getUser()"))
.transform(Message.class, mailPushTransformer::toPushNotification)
.aggregate(aggregatorSpec -> aggregatorSpec
.expireGroupsUponTimeout(true)
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.expireTimeout(1000)
.requiresReply(false)
.messageStore(new SimpleMessageStore(Integer.MAX_VALUE))
.async(true)
.correlationStrategy(message -> true)
.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(600, 1000))
.notPropagatedHeaders(TracingConstants.USER_ID)
.headersFunction(messageGroup -> Map.of(TracingConstants.MESSAGES_IDS,MailPushFlow.collectMessageIds(messageGroup)))
.outputProcessor(MailPushFlow::extractPayload)
.advice(requestHandlerRetryAdvice)
.releaseLockBeforeSend(true)
)
.handle(mailPushService, "sendMailPushNotificationChunk",
e -> e.id(MAIL_PUSH_NOTIFICATION_GATEWAY)
.advice(this.requestHandlerRetryAdvice)
.async(true)
.requiresReply(false))
.get();
我认为使用 TimeoutCountSequenceSizeReleaseStrategy 消息是按顺序发送的,但如果我检查执行时间(基于较旧的消息时间戳),聚合似乎需要大约 40 秒,然后慢慢减少到 1-2 秒。
这个配置有什么问题吗?我需要根据到达顺序发送消息组
听起来像是按时间窗口:https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/。因此,您需要计算出一个时间段来决定哪些消息适合其中。周期和乘数可以作为组 ID(相关键)。
所以,让我们想象一下分组方面是
10 seconds period
。因此,第一组位于区间 0-10
中。接下来的11-20
等等。您绝对可以将组 ID 作为间隔定义:0-10
、11-20
、21-30
等。
自定义
CorrelationStrategy
将根据消息时间戳计算该组ID。不幸的是,这与框架无关。您只需要制定一个商业模式并实施相应的策略即可。