@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(sftpHost);
factory.setPort(sftpPort);
factory.setUser(sftpUser);
// if (sftpPrivateKey != null) {
// factory.setPrivateKey(sftpPrivateKey);
// factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
// }
factory.setPassword(sftpPassword);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
SftpInboundConfig.java
package com.shail.sftp.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource;
import org.springframework.messaging.MessageHandler;
import java.io.File;
@Configuration
public class SftpInboundConfig {
@Autowired
private SessionFactory session;
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer synchronizer = new SftpInboundFileSynchronizer(session);
synchronizer.setDeleteRemoteFiles(true);
synchronizer.setRemoteDirectory("/home/shail/sftp_dir/patient/");
synchronizer.setFilter(new SftpSimplePatternFileListFilter("*.*")); // File filter pattern
return synchronizer;
}
@Bean
@InboundChannelAdapter(value = "sftpChannel", poller = @Poller(fixedRate = "60000")) // Poll every 60 seconds
public MessageSource<File> sftpInboundAdapter() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("src/main/resources/localDownload/patient/"));
source.setAutoCreateLocalDirectory(true);
//source.setLocalFilter(new AcceptOnceFileListFilter<>());
source.setMaxFetchSize(-1); // Set to a higher value if you want to fetch multiple files at a time
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler sftpMessageHandler() {
return message -> {
// Handle the downloaded file (e.g., process or save it)
File file = (File) message.getPayload();
System.out.println("Received file: " + file.getName() + " @ " + LocalDateTime.now());
};
}
}
应用程序启动时,在第一次轮询期间,所有文件都会下载到本地目录中;但消息处理程序在每个轮询中接收一个文件,即在控制台中每隔 1 分钟我就会看到日志
Received file: Product_1_20231107162209.json @ 2023-11-07T17:01:16.124739600
.MessageHandler
浏览完初始缓存中下载的所有文件列表。
请参阅下面的日志和文件资源管理器屏幕截图,了解有关计时行为的更多详细信息
我正在尝试做的事情:
我想实现入站适配器,以便在每次轮询中 sftp 服务器上的所有文件都应该下载到本地目录,这意味着当第一次轮询发生在 17:12 时,前 5 个文件应该在 17:14 被下载当第二组文件添加到 sftp 服务器上时,它们应该在 17:15 全部下载完毕,而不是等到 17:17 让 MessageHandler 浏览 17:12 收到的文件列表。您能否建议如何处理这个问题吗?
我尝试过的:
我尝试增加减少 maxFetchSize 并尝试删除
AcceptOnceFileListFilter
但这没有帮助。
@Poller
的 @InboundChannelAdapter
每个任务轮询一条消息。考虑像 maxMessagesPerPoll = "-1"
那样配置它。这样,所有可用文件都将在单个轮询任务中处理。然后,由于没有本地文件,它将在下一个轮询周期向 SFTP 请求更多文件。在您的情况下,大约一分钟后,fixedRate
意味着在该轮询任务开始后立即开始新的轮询任务。