具有超时和组大小的聚合需要大量时间

问题描述 投票:0回答:1

我有这个集成流程

IntegrationFlows.from(MessageChannels.queue(MailPushGateway.SEND_MAIL_NOTIFICATION_CHANNEL, Integer.MAX_VALUE))
                .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.headerExpression(TracingConstants.USER_ID, "payload.getUser()"))
                .transform(Message.class, mailPushTransformer::toPushNotification)
                .aggregate(aggregatorSpec -> aggregatorSpec
                        .expireGroupsUponTimeout(true)
                        .expireGroupsUponCompletion(true)
                        .sendPartialResultOnExpiry(true)
                        .expireTimeout(1000)
                        .requiresReply(false)
                        .messageStore(new SimpleMessageStore(Integer.MAX_VALUE))
                        .async(true)
                        .correlationStrategy(message -> true)
                        .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(600, 1000))
                        .notPropagatedHeaders(TracingConstants.USER_ID)
                        .headersFunction(messageGroup -> Map.of(TracingConstants.MESSAGES_IDS,MailPushFlow.collectMessageIds(messageGroup)))
                        .outputProcessor(MailPushFlow::extractPayload)
                        .advice(requestHandlerRetryAdvice)
                        .releaseLockBeforeSend(true)
                )
                .handle(mailPushService, "sendMailPushNotificationChunk",
                        e -> e.id(MAIL_PUSH_NOTIFICATION_GATEWAY)
                                .advice(this.requestHandlerRetryAdvice)
                                .async(true)
                                .requiresReply(false))
                .get();

我认为使用 TimeoutCountSequenceSizeReleaseStrategy 消息是按顺序发送的,但如果我检查执行时间(基于较旧的消息时间戳),聚合似乎需要大约 40 秒,然后慢慢减少到 1-2 秒。

这个配置有什么问题吗?我需要根据到达顺序发送消息组

spring spring-integration spring-integration-dsl
1个回答
0
投票

听起来像是按时间窗口:https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/。因此,您需要计算出一个时间段来决定哪些消息适合其中。周期和乘数可以作为组 ID(相关键)。

所以,让我们想象一下分组方面是

10 seconds period
。因此,第一组位于区间
0-10
中。接下来的
11-20
等等。您绝对可以将组 ID 作为间隔定义:
0-10
11-20
21-30
等。

自定义

CorrelationStrategy
将根据消息时间戳计算该组ID。不幸的是,这与框架无关。您只需要制定一个商业模式并实施相应的策略即可。

© www.soinside.com 2019 - 2024. All rights reserved.