Spring Integration sftp 新文件不会轮询,直到消息处理程序遍历本地缓存的所有文件列表

问题描述 投票:0回答:1
    @Bean
    public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(sftpHost);
        factory.setPort(sftpPort);
        factory.setUser(sftpUser);
//      if (sftpPrivateKey != null) {
//          factory.setPrivateKey(sftpPrivateKey);
//          factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
//      }
        factory.setPassword(sftpPassword);
        factory.setAllowUnknownKeys(true);

        return new CachingSessionFactory<>(factory);
    }
    

SftpInboundConfig.java

package com.shail.sftp.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageSource;


import org.springframework.integration.file.filters.AcceptOnceFileListFilter;

import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource;

import org.springframework.messaging.MessageHandler;

import java.io.File;

@Configuration
public class SftpInboundConfig {

    @Autowired
    private SessionFactory session;
    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer synchronizer = new SftpInboundFileSynchronizer(session);
        synchronizer.setDeleteRemoteFiles(true); 
        synchronizer.setRemoteDirectory("/home/shail/sftp_dir/patient/");
        synchronizer.setFilter(new SftpSimplePatternFileListFilter("*.*")); // File filter pattern
        return synchronizer;
    }


    @Bean
    @InboundChannelAdapter(value = "sftpChannel", poller = @Poller(fixedRate = "60000")) // Poll every 60 seconds
    public MessageSource<File> sftpInboundAdapter() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("src/main/resources/localDownload/patient/"));
        source.setAutoCreateLocalDirectory(true);
        //source.setLocalFilter(new AcceptOnceFileListFilter<>());
        source.setMaxFetchSize(-1); // Set to a higher value if you want to fetch multiple files at a time
        return source;
    }



    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler sftpMessageHandler() {
        return message -> {
            // Handle the downloaded file (e.g., process or save it)
            File file = (File) message.getPayload();
            System.out.println("Received file: " + file.getName() + "  @  " + LocalDateTime.now());
        };
    }
}

应用程序启动时,在第一次轮询期间,所有文件都会下载到本地目录中;但消息处理程序在每个轮询中接收一个文件,即在控制台中每隔 1 分钟我就会看到日志

Received file: Product_1_20231107162209.json  @  2023-11-07T17:01:16.124739600
.
如果在 sftp 服务器中添加新文件,则服务(代码)不会下载新文件,直到
MessageHandler
浏览完初始缓存中下载的所有文件列表。

请参阅下面的日志和文件资源管理器屏幕截图,了解有关计时行为的更多详细信息

文件资源管理器

控制台日志

我正在尝试做的事情:

我想实现入站适配器,以便在每次轮询中 sftp 服务器上的所有文件都应该下载到本地目录,这意味着当第一次轮询发生在 17:12 时,前 5 个文件应该在 17:14 被下载当第二组文件添加到 sftp 服务器上时,它们应该在 17:15 全部下载完毕,而不是等到 17:17 让 MessageHandler 浏览 17:12 收到的文件列表。您能否建议如何处理这个问题吗?

我尝试过的:

我尝试增加减少 maxFetchSize 并尝试删除

AcceptOnceFileListFilter
但这没有帮助。

java spring spring-integration spring-integration-sftp
1个回答
0
投票

@Poller
@InboundChannelAdapter
每个任务轮询一条消息。考虑像
maxMessagesPerPoll = "-1"
那样配置它。这样,所有可用文件都将在单个轮询任务中处理。然后,由于没有本地文件,它将在下一个轮询周期向 SFTP 请求更多文件。在您的情况下,大约一分钟后,
fixedRate
意味着在该轮询任务开始后立即开始新的轮询任务。

© www.soinside.com 2019 - 2024. All rights reserved.