Spring Integration 文件分割示例不会对进入入站适配器通道的内容做出反应

问题描述 投票:0回答:1

我正在测试 SI 示例 中的以下示例设置,以了解并使其适应类似的任务集。我不需要邮件功能,但其余部分似乎足以进行实验。

DailyCashReportApplication.java

@Slf4j
@SpringBootApplication
@PropertySource(value = "classpath:feeds-config.yml", factory = com.pru.globalpayments.feeds.downstream.YamlPropertySourceFactory.class)
public class DailyCashReportApplication  implements CommandLineRunner  {
    
    
    private final static String EMAIL_SUCCESS_SUFFIX = "emailSuccessSuffix";
    
    // sftp

    @Value("${sftp.in.host}")
    @Getter
    private String host;

    @Value("${sftp.in.port}")
    private int port;

    @Value("${sftp.in.user}")
    private String user;

    @Value("${sftp.in.privateKey}")
    @Getter
    private String privateKeyLocation;

    @Value("${sftp.in.remoteDir}")
    @Getter
    private String remoteDir;


    @Value("${sftp.in.localDir}")
    @Getter
    private String localDir;

    @Value("${sftp.in.chmod}")
    private String chmod;

    @Value("${sftp.in.maxFetchSize}")
    @Getter
    private int maxFetchSize;



    @Value("${sftp.in.file.filter}")
    private String fileFilter;

    @Autowired
    private ResourceLoader resourceLoader;

    
//  @Autowired
//  @Qualifier("syntheticRunner") //TODO: to be replaced by runners consuming from appropriate data sources
//  private AbstractRunner runner; 

    public static void main(String[] args) {
        log.info("DailyCashReportApplication running...");
    
        new SpringApplicationBuilder(DailyCashReportApplication.class).web(WebApplicationType.NONE).run(args);
        
    }

    @Override
    public void run(String... args) throws Exception {
        //runner.run(args);
    }

DcrConfig.java

@Configuration
public class DcrConfig {

    private final static String EMAIL_SUCCESS_SUFFIX = "emailSuccessSuffix";
    
    // sftp

    @Value("${sftp.in.host}")
    @Getter
    private String host;

    @Value("${sftp.in.port}")
    private int port;

    @Value("${sftp.in.user}")
    private String user;

    @Value("${sftp.in.privateKey}")
    @Getter
    private String privateKeyLocation;

    @Value("${sftp.in.remoteDir}")
    @Getter
    private String remoteDir;



    @Value("${sftp.in.localDir}")
    @Getter
    private String localDir;

    @Value("${sftp.in.chmod}")
    private String chmod;

    @Value("${sftp.in.maxFetchSize}")
    @Getter
    private int maxFetchSize;

//  @Value("${sftp.in.poller.fixedDelay}") @Getter
//  private int pollerFixedDelay;

    @Value("${sftp.in.file.filter}")
    private String fileFilter;

    @Autowired
    private ResourceLoader resourceLoader;
    
    @Bean(name = "downloadSftpSessionFactory")
    public SessionFactory<LsEntry> sftpSessionFactory() throws IOException {

        Resource keyFileResource = resourceLoader.getResource(privateKeyLocation);

        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPrivateKey(keyFileResource);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }

    
    
    @Bean
    public IntegrationFlow fromFile() {
        return IntegrationFlows.from(Files.inboundAdapter(new File(remoteDir))
                                        .preventDuplicates(false)
                                        .patternFilter("*.txt"), 
                                        e -> e.poller(Pollers.fixedDelay(5000).errorChannel("tfrErrors.input"))
                                        .id("fileInboundChannelAdapter"))
                .handle(Files.splitter(true, true))
                .<Object, Class<?>>route(Object::getClass, m->m
                        .channelMapping(FileSplitter.FileMarker.class, "markers.input")
                        .channelMapping(String.class, "lines.input"))
                .get();
    }
    
    @Bean
    public FileWritingMessageHandlerSpec fileOut() {
        return Files.outboundAdapter(localDir)
                .appendNewLine(true)
                .fileNameExpression("payload.substring(1,4) + '.txt'");
    }
    
    @Bean
    public IntegrationFlow lines(FileWritingMessageHandler fileOut) {
        return f -> f.handle(fileOut); 
    }
    

    
    @Bean
    public IntegrationFlow markers() throws IOException{
        return f -> f.<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END),
                        e -> e.id("markerFilter"))
                .publishSubscribeChannel(s -> s

                        // first trigger file flushes
                        .subscribe(sf -> sf.transform("'tmp/out/.*\\.txt'", e -> e.id("toTriggerPattern"))
                                .trigger("fileOut", e -> e.id("flusher")))

                        // send the first file
                        .subscribe(sf -> {
                            try {
                                sf.<FileSplitter.FileMarker, File>transform(p -> new File("tmp/out/002.txt"))
                                        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "002.txt", true))
                                        .handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp002"));
                            } catch (IOException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                            }
                        })

                        // send the second file
                        .subscribe(sf -> {
                            try {
                                sf.<FileSplitter.FileMarker, File>transform(p -> new File("/tmp/out/006.txt"))
                                        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "006.txt", true))
                                        .handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp006"));
                            } catch (IOException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                            }
                        })

                        // send the third file
                        .subscribe(sf -> {
                            try {
                                sf.<FileSplitter.FileMarker, File>transform(p -> new File("/tmp/out/009.txt"))
                                        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "009.txt", true))
                                        .handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp009"));
                            } catch (IOException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                            }
                        })


                        );
    }
    
    @Bean
    public IntegrationFlow tfrErrors() {
        return f -> f
//              .enrichHeaders(Mail.headers()
//                      .subject("File split and transfer failed")
//                      .from("foo@bar")
//                      .toFunction(m -> new String[] { "bar@baz"} ))
//              .enrichHeaders(h -> h.header(EMAIL_SUCCESS_SUFFIX, ".failed")
//                  .headerExpression(FileHeaders.ORIGINAL_FILE, "payload.failedMessage.headers['"
//                          + FileHeaders.ORIGINAL_FILE + "']"))
//              .<MessagingException, String>transform(p->p.getFailedMessage().getPayload()+"\n"+getStackTraceAsString(p))
                .channel("toMail.input");
    }
    
    
    private String getStackTraceAsString(Throwable cause) {
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter(stringWriter, true);
        cause.printStackTrace(printWriter);
        return stringWriter.getBuffer().toString();
    }
    

    
    @Bean
    public SessionFactory<LsEntry> ftp() throws IOException {

        Resource keyFileResource = resourceLoader.getResource(privateKeyLocation);

//      File keyFile = keyFileResource.getFile();
//      try (BufferedReader br = new BufferedReader(new FileReader(keyFile))) {
//             String line;
//             while ((line = br.readLine()) != null) {
//                 System.out.println(line);
//             }
//          }
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPrivateKey(keyFileResource);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }
    
    @Bean
    public MethodInterceptor afterMailAdvice() {
        return invocation -> {
            Message<?> message = (Message<?>) invocation.getArguments()[0];
            MessageHeaders headers = message.getHeaders();
            File originalFile = headers.get(FileHeaders.ORIGINAL_FILE, File.class);
            try {
                invocation.proceed();
                originalFile.renameTo(new File(originalFile.getAbsolutePath() + headers.get(EMAIL_SUCCESS_SUFFIX)));
            }
            catch(Exception e) {
                originalFile.renameTo(new File(originalFile.getAbsolutePath() + headers.get(EMAIL_SUCCESS_SUFFIX)+"email.failed"));
            }
            return null;
        };
    }
    
}

application-dev.yml

spring:
  application:
    name: daily-cash-report-application

sftp:
  in:
    host: eafdev
    port: 22
    user: eafdev
    remoteDir: tmp/in 
    tmpDir: 
    localDir: tmp/out 
    maxFetchSize: 1    
    privateKey: file:///C:/Users/x12345/AppData/Roaming/SSH/UserKeys/distributessh
    chmod: 664 
    poller:
      fixedDelay: 10000
    file:
      filter: A.B.*    #TODO: calibrate once spec is known
  out:
    host: eafdev
    port: 22
    user: eafdev
    remoteDir: tmp/out  
    privateKey: file:///C:/Users/x12345/AppData/Roaming/SSH/UserKeys/distributessh
    chmod: 664 
  
file: 
  out:   
    targetDir: 
    tmpDir: 

DailyCashReportApplicationTest.java

@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@SpringIntegrationTest(noAutoStartup = "fileInboundChannelAdapter")
@SpringJUnitConfig(classes = { 
        DailyCashReportApplication.class,
        DcrConfig.class,
        })
@EnableConfigurationProperties
@PropertySource(value = "application-dev.yml", factory = YamlPropertySourceFactory.class)
@Slf4j
class DailyCashReportApplicationTest {

    @Test
    @SneakyThrows
    void test()  {
        
        File in = new File("tmp/in/", "foo");
        FileOutputStream fos = new FileOutputStream(in);
        fos.write("*002,foo,bar\n*006,baz,qux\n*009,fiz,buz\n".getBytes());
        fos.close();
        in.renameTo(new File("tmp/in/", "foo.txt"));
        
        File out = new File("tmp/out/002.txt");
        int n = 0;
        while (n++ < 100 && (!out.exists() || out.length() < 12)) {
            Thread.sleep(100);
        }
        assertThat(out.exists()).isTrue();
        
    }

}

上述测试失败,因为入站文件通道适配器似乎没有拾取放入

tmp/in
目录中的消息,因此
tmp/out
文件夹为空。

缺少什么?有没有办法附加一些日志记录功能,以便更清楚地查看消息传递基础设施中发生的情况?

也许测试中的以下行是禁用所需行为的罪魁祸首:

@SpringIntegrationTest(noAutoStartup = "fileInboundChannelAdapter")

不知道为什么它在那里,但将其注释掉会导致以下错误:

Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'tmp' cannot be found on object of type 'org.springframework.messaging.support.GenericMessage' - maybe not public or not valid?
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:223)

更新: 看起来 SPeL 表达式中的那些勾号很重要,更改为以下内容似乎可以使其通过管道发送文件:

    @Bean
    public FileWritingMessageHandlerSpec fileOut() {
        return Files.outboundAdapter("'"+localDir+"'")
                .appendNewLine(true)
                .fileNameExpression("payload.substring(1,4) + '.txt'");
    }

另外,变压器中以下表达式的解释是什么:

"'tmp/out/.*\\.txt'"
听起来像是该特定目录中所有文本文件的通配符,但为什么需要

所有这些额外的点和斜杠?一些文档中有详细描述吗?

谢谢。

spring-boot spring-integration messaging spring-integration-dsl
1个回答
0
投票
参见该工厂的Javadoc:

/** * Create a {@link FileWritingMessageHandlerSpec} builder for the one-way {@code FileWritingMessageHandler}. * @param directoryExpression the SpEL expression to evaluate target directory for writing files. * @return the {@link FileWritingMessageHandlerSpec} instance. */ public static FileWritingMessageHandlerSpec outboundAdapter(String directoryExpression) {
因此,该参数被解析为 SpEL 表达式,并且由于您依赖于目录名称的文字,因此必须将其引用。

您正在谈论的那部分代码:

// first trigger file flushes .subscribe(sf -> sf.transform("'/tmp/out/.*\\.txt'", e -> e.id("toTriggerPattern")) .trigger("fileOut", e -> e.id("flusher")))
用于在 

MessageTriggerAction

 上实现的 
FileWritingMessageHandler
。另请参阅 
trigger()
FileWritingMessageHandler
 Javadocs 和文档: 
https://docs.spring.io/spring-integration/reference/file/writing.html#file-flushing

本质上,我们需要来自变压器的模式,用于

DefaultFlushPredicate

 中的 
FileWritingMessageHandler

© www.soinside.com 2019 - 2024. All rights reserved.