执行程序的Spring Integration DSL自定义错误通道问题

问题描述 投票:0回答:1

嗨,我有一个文件侦听器,它并行读取文件/一次读取多个文件

package com.example.demo.flow;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.*;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.stereotype.Component;

import java.io.File;
import java.util.concurrent.Executors;

/**
 * Created by muhdk on 03/01/2020.
 */
@Component
@Slf4j
public class TestFlow {

    @Bean
    public StandardIntegrationFlow errorChannelHandler() {

        return IntegrationFlows.from("testChannel")
                .handle(o -> {

                    log.info("Handling error....{}", o);
                }).get();
    }

    @Bean
    public IntegrationFlow testFile() {


        IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
                e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(5)
                        .errorChannel("testChannel")))
                .channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
                .transform(o -> {

                    throw new RuntimeException("Failing on purpose");

                }).handle(o -> {
                });

        return testChannel.get();


    }


}

不会转到我的自定义错误频道

但是如果我删除行

            .channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))

然后转到错误通道。

如何使它起作用,以便它与执行程序一起转到我的自定义错误通道。

java spring-boot spring-integration spring-integration-dsl spring-integration-file
1个回答
0
投票

似乎在将Executor服务与多条消息一起使用时,它与正常的errorChannel不起作用,我不知道为什么

我做了这样的更改

@Bean
public IntegrationFlow testFile() {


    IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
            e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(10)
            ))
            .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "testChannel"))
            .channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))

            .transform(o -> {

                throw new RuntimeException("Failing on purpose");

            }).handle(o -> {
            });

    return testChannel.get();


}

这里的线

        .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "testChannel"))

其余部分保持不变,并且可以使用。

© www.soinside.com 2019 - 2024. All rights reserved.