多线程执行器通道可加速使用者进程

问题描述 投票:0回答:1

我有一个消息生成器,它每秒产生大约15条消息

使用者是一个Spring集成项目,它从Message Queue进行消耗,并进行了大量处理。当前,它是单线程的,无法与生产者发送消息的速率匹配。因此队列深度不断增加

        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
                .wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
                .filter(ingFilter, "filterMessageOnEvent").transform(eventHandler, "parseEvent")
                .aggregate(a -> a.correlationStrategy(corStrgy, "getCorrelationKey").releaseStrategy(g -> {

                    boolean eonExists = g.getMessages().stream()
                            .anyMatch(eon -> ((FlightModel) eon.getPayload()).getEstGmtOnDtm() != null);
                    if (eonExists) {
                        boolean einExists = g.getMessages().stream()
                                .anyMatch(ein -> ((FlightModel) ein.getPayload()).getEstGmtInDtm() != null);
                        if (einExists) {
                            return true;
                        }
                    }
                    return false;
                }).messageStore(this.messageStore)).channel("AggregatorEventChannel").get();

是否有可能使用执行程序通道在多线程环境中处理此问题并加快使用者进程的速度

如果是,请建议我如何实现-为确保消息排序,我需要将相同类型的消息(基于消息的ID)分配给执行者通道的相同线程。

spring spring-integration spring-integration-dsl
1个回答
0
投票

因此队列深度不断增加

由于看起来您的队列在JMS代理上的某个位置,因此确实可以实现这种行为。这正是设计消息传递系统的目的-区分生产者和消费者,并尽可能处理目标中的消息。

如果要从JMS增加轮询,可以考虑在JMS容器上使用concurrency选项:

/**
 * The concurrency to use.
 * @param concurrency the concurrency.
 * @return current {@link JmsDefaultListenerContainerSpec}.
 * @see DefaultMessageListenerContainer#setConcurrency(String)
 */
public JmsDefaultListenerContainerSpec concurrency(String concurrency) {
    this.target.setConcurrency(concurrency);
    return this;
}

/**
 * The concurrent consumers number to use.
 * @param concurrentConsumers the concurrent consumers count.
 * @return current {@link JmsDefaultListenerContainerSpec}.
 * @see DefaultMessageListenerContainer#setConcurrentConsumers(int)
 */
public JmsDefaultListenerContainerSpec concurrentConsumers(int concurrentConsumers) {
    this.target.setConcurrentConsumers(concurrentConsumers);
    return this;
}

/**
 * The max for concurrent consumers number to use.
 * @param maxConcurrentConsumers the max concurrent consumers count.
 * @return current {@link JmsDefaultListenerContainerSpec}.
 * @see DefaultMessageListenerContainer#setMaxConcurrentConsumers(int)
 */
public JmsDefaultListenerContainerSpec maxConcurrentConsumers(int maxConcurrentConsumers) {
    this.target.setMaxConcurrentConsumers(maxConcurrentConsumers);
    return this;
}

[查看文档的更多信息:https://docs.spring.io/spring/docs/5.2.3.RELEASE/spring-framework-reference/integration.html#jms-receiving

但是,这不允许您“将消息分配给特定的线程”。就像在JMS中无法分区一样。

我们可以通过Spring Integration使用router根据您的“基于消息的ID”以及配置有单线程ExecutorChannel的特定Executor实例来完成此操作。每个ExecutorChannel将成为仅具有单个线程的专用执行程序。这样,您将确保具有相同分区键的消息的顺序,并将并行处理它们。所有的ExecutorChannel可以具有相同的订户,也可以将bridge分配给相同的频道进行处理。

但是,请记住,当您离开JMS侦听器线程时,您将完成JMS事务,并且无法在该单独的线程中处理消息,则可能会丢失消息。

© www.soinside.com 2019 - 2024. All rights reserved.