Spring 集成流程:寻找避免由于递归流程而导致 java.lang.StackOverflowError 的策略

问题描述 投票:0回答:2
使用

Spring-Integration

 版本 
5.5.18
,我有一个 
flow
,它适用于小型输入集,但由于通过 
java.lang.StackOverflowError
 进行递归调用并使用 
channel 结束递归,因此对于较大的集会遇到
filter
。对于修改流程以减少产生
java.lang.StackOverflowError
的可能性,是否有任何建议? (我确实知道为 JVM 请求更大的线程堆栈。)是否有一种方法可以通过 DSL 声明迭代地生成此内容,而不是递归地生成?或者这可能需要一个 
MessageHandler
Java
 
Spring-Integration
 之外的 
Java-DSL
 中执行迭代?

展示问题的示例流程可能是:

package org.example.filter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.dsl.*; import org.springframework.messaging.Message; import java.util.function.Consumer; @Configuration public class OverflowFlows { private static final String CURRENT_ITERATION = "currentIteration"; private static final String ACCUMULATOR = "accumulator"; private static final String RECURSION_CHANNEL = "recurseToBodyValue"; @Bean public IntegrationFlow callRecursiveFlow() { return IntegrationFlows .from("createsOverflow") .enrichHeaders(h -> h.headerFunction(CURRENT_ITERATION, m -> 0)) .enrichHeaders(h -> h.headerFunction(ACCUMULATOR, m -> 0)) .gateway(callRecurseToBodyValueFlow) .transform("headers.accumulator") .get(); } private final IntegrationFlow callRecurseToBodyValueFlow = flow -> flow.channel(RECURSION_CHANNEL); @Bean IntegrationFlow recurseToBodyValueFlow() { return IntegrationFlows .from(RECURSION_CHANNEL) .enrichHeaders(h -> h.headerExpression(CURRENT_ITERATION, "headers.currentIteration + 1", true)) .filter("headers.currentIteration < payload", DISCARD_RETURNS_CURRENT_MESSAGE) .enrichHeaders(h -> h.headerExpression(ACCUMULATOR, "headers.accumulator + headers.currentIteration", true)) .channel(RECURSION_CHANNEL) .get(); } public static final Consumer<FilterEndpointSpec> DISCARD_RETURNS_CURRENT_MESSAGE = discardReturnsCurrentMessage(); public static Consumer<FilterEndpointSpec> discardReturnsCurrentMessage() { return filterSpec -> filterSpec.discardFlow(IntegrationFlowDefinition::bridge); } }

MessageChannel

MessagingGateway
 的声明结合使用:

package org.example.filter; import org.springframework.context.annotation.Bean; import org.springframework.integration.dsl.MessageChannels; import org.springframework.messaging.MessageChannel; import org.springframework.stereotype.Component; @Component public class OverflowMessageChannels { @Bean(name = "createsOverflow") public MessageChannel createsOverflow() { return MessageChannels.direct().get(); } }
package org.example.filter;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;

@MessagingGateway(name = "overflowGateway", defaultReplyTimeout = "20")
public interface OverflowGateway {
    @Gateway(requestChannel = "createsOverflow")
    Integer createsOverflow(Integer value);
}

1000

迭代单元测试会导致堆栈溢出。

任何有关策略的指示将不胜感激。

import org.example.filter.OverflowGateway; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.stream.IntStream; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = org.example.filter.ApiConfiguration.class) public class OverflowTest { @Autowired OverflowGateway overflowGateway; @Test public void call_with_one_returns_one() { assertThat(overflowGateway.createsOverflow(1), equalTo(sumOfRange(1))); } @Test public void call_with_20_returns_20() { assertThat(overflowGateway.createsOverflow(20), equalTo(sumOfRange(20))); } @Test public void call_with_1000_returns_1000() { // causes java.lang.StackOverflowError assertThat(overflowGateway.createsOverflow(1000), equalTo(sumOfRange(1000))); } private static int sumOfRange(final int topOfRange) { return IntStream.range(1, topOfRange).sum(); } }
    
spring spring-integration spring-integration-dsl
2个回答
0
投票
我认为这与 Spring Integration 无关。您只需使用普通 Java 即可获得

StackOverflowError

java 调用堆栈的最大深度是多少?。是的,Spring Integration 通过通道和端点之间的消息传递性质增加了一些开销,但从逻辑上讲,这并不重要:如果您的逻辑在同一线程中递归,那么当添加更多步骤时,您最终会得到 StackOverflowError
那个循环。

不过,您可以破坏它,使

RECURSION_CHANNEL

 成为 
ExecutorChannel
 实例。通过这种方式,您可以将该循环的每次迭代推送到单独的线程中,并且调用堆栈变得不递归。


0
投票
我的解决方法是编写一个显式的

MessageHandler

 来执行迭代。这不是我想要的形式,但这是我想要的。

测试方法

call_with_1000_returns_1000()

适用于此表格。

package org.example.filter; import org.springframework.context.annotation.Bean; import org.springframework.integration.dsl.*; import org.springframework.stereotype.Component; @Component public class OverflowFlows { public static final String CURRENT_ITERATION = "currentIteration"; public static final String ACCUMULATOR = "accumulator"; private final IteratingMessageHandler iteratingMessageHandler; public OverflowFlows(final IteratingMessageHandler iteratingMessageHandler) { this.iteratingMessageHandler = iteratingMessageHandler; } @Bean public IntegrationFlow callRecursiveFlow() { return IntegrationFlows .from("createsOverflow") .enrichHeaders(h -> h.headerFunction(CURRENT_ITERATION, m -> 0)) .enrichHeaders(h -> h.headerFunction(ACCUMULATOR, m -> 0)) .gateway(callIterativeHandler) .transform("headers.accumulator") // put final head value in body .get(); } private final IntegrationFlow callIterativeHandler = flow -> flow.handle(iteratingMessageHandler); }
package org.example.filter;

import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import static org.example.filter.OverflowFlows.ACCUMULATOR;
import static org.example.filter.OverflowFlows.CURRENT_ITERATION;

@Component
public class IteratingMessageHandler extends AbstractReplyProducingMessageHandler {
    @Override
    protected Object handleRequestMessage(Message<?> message) {
        do {
            final Integer iteration = getIntegerValue(message, CURRENT_ITERATION);
            message = setHeader(message, CURRENT_ITERATION, iteration + 1);
            if (isCurrentIterationBelowPayload(message)) {
                final Integer accumulator = getIntegerValue(message, ACCUMULATOR);
                message = setHeader(message, ACCUMULATOR, accumulator + iteration + 1);
            }
        } while (isCurrentIterationBelowPayload(message));
        return message;
    }

    private static boolean isCurrentIterationBelowPayload(final Message<?> message) {
        return getIntegerValue(message, CURRENT_ITERATION) < (Integer) message.getPayload();
    }

    private static Integer getIntegerValue(final Message<?> message, final String header) {
        return (Integer) message.getHeaders().get(header);
    }

    private static Message<?> setHeader(final Message<?> message, final String header, final Object value) {
        return MessageBuilder.fromMessage(message)
                .setHeader(header, value)
                .build();
    }
}
    
© www.soinside.com 2019 - 2024. All rights reserved.