我正在测试 SI 示例 中的以下示例设置,以了解并使其适应类似的任务集。我不需要邮件功能,但其余部分似乎足以进行实验。
DailyCashReportApplication.java
@Slf4j
@SpringBootApplication
@PropertySource(value = "classpath:feeds-config.yml", factory = com.pru.globalpayments.feeds.downstream.YamlPropertySourceFactory.class)
public class DailyCashReportApplication implements CommandLineRunner {
private final static String EMAIL_SUCCESS_SUFFIX = "emailSuccessSuffix";
// sftp
@Value("${sftp.in.host}")
@Getter
private String host;
@Value("${sftp.in.port}")
private int port;
@Value("${sftp.in.user}")
private String user;
@Value("${sftp.in.privateKey}")
@Getter
private String privateKeyLocation;
@Value("${sftp.in.remoteDir}")
@Getter
private String remoteDir;
@Value("${sftp.in.localDir}")
@Getter
private String localDir;
@Value("${sftp.in.chmod}")
private String chmod;
@Value("${sftp.in.maxFetchSize}")
@Getter
private int maxFetchSize;
@Value("${sftp.in.file.filter}")
private String fileFilter;
@Autowired
private ResourceLoader resourceLoader;
// @Autowired
// @Qualifier("syntheticRunner") //TODO: to be replaced by runners consuming from appropriate data sources
// private AbstractRunner runner;
public static void main(String[] args) {
log.info("DailyCashReportApplication running...");
new SpringApplicationBuilder(DailyCashReportApplication.class).web(WebApplicationType.NONE).run(args);
}
@Override
public void run(String... args) throws Exception {
//runner.run(args);
}
DcrConfig.java
@Configuration
public class DcrConfig {
private final static String EMAIL_SUCCESS_SUFFIX = "emailSuccessSuffix";
// sftp
@Value("${sftp.in.host}")
@Getter
private String host;
@Value("${sftp.in.port}")
private int port;
@Value("${sftp.in.user}")
private String user;
@Value("${sftp.in.privateKey}")
@Getter
private String privateKeyLocation;
@Value("${sftp.in.remoteDir}")
@Getter
private String remoteDir;
@Value("${sftp.in.localDir}")
@Getter
private String localDir;
@Value("${sftp.in.chmod}")
private String chmod;
@Value("${sftp.in.maxFetchSize}")
@Getter
private int maxFetchSize;
// @Value("${sftp.in.poller.fixedDelay}") @Getter
// private int pollerFixedDelay;
@Value("${sftp.in.file.filter}")
private String fileFilter;
@Autowired
private ResourceLoader resourceLoader;
@Bean(name = "downloadSftpSessionFactory")
public SessionFactory<LsEntry> sftpSessionFactory() throws IOException {
Resource keyFileResource = resourceLoader.getResource(privateKeyLocation);
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPrivateKey(keyFileResource);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public IntegrationFlow fromFile() {
return IntegrationFlows.from(Files.inboundAdapter(new File(remoteDir))
.preventDuplicates(false)
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(5000).errorChannel("tfrErrors.input"))
.id("fileInboundChannelAdapter"))
.handle(Files.splitter(true, true))
.<Object, Class<?>>route(Object::getClass, m->m
.channelMapping(FileSplitter.FileMarker.class, "markers.input")
.channelMapping(String.class, "lines.input"))
.get();
}
@Bean
public FileWritingMessageHandlerSpec fileOut() {
return Files.outboundAdapter(localDir)
.appendNewLine(true)
.fileNameExpression("payload.substring(1,4) + '.txt'");
}
@Bean
public IntegrationFlow lines(FileWritingMessageHandler fileOut) {
return f -> f.handle(fileOut);
}
@Bean
public IntegrationFlow markers() throws IOException{
return f -> f.<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END),
e -> e.id("markerFilter"))
.publishSubscribeChannel(s -> s
// first trigger file flushes
.subscribe(sf -> sf.transform("'tmp/out/.*\\.txt'", e -> e.id("toTriggerPattern"))
.trigger("fileOut", e -> e.id("flusher")))
// send the first file
.subscribe(sf -> {
try {
sf.<FileSplitter.FileMarker, File>transform(p -> new File("tmp/out/002.txt"))
.enrichHeaders(h -> h.header(FileHeaders.FILENAME, "002.txt", true))
.handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp002"));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
})
// send the second file
.subscribe(sf -> {
try {
sf.<FileSplitter.FileMarker, File>transform(p -> new File("/tmp/out/006.txt"))
.enrichHeaders(h -> h.header(FileHeaders.FILENAME, "006.txt", true))
.handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp006"));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
})
// send the third file
.subscribe(sf -> {
try {
sf.<FileSplitter.FileMarker, File>transform(p -> new File("/tmp/out/009.txt"))
.enrichHeaders(h -> h.header(FileHeaders.FILENAME, "009.txt", true))
.handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp009"));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
})
);
}
@Bean
public IntegrationFlow tfrErrors() {
return f -> f
// .enrichHeaders(Mail.headers()
// .subject("File split and transfer failed")
// .from("foo@bar")
// .toFunction(m -> new String[] { "bar@baz"} ))
// .enrichHeaders(h -> h.header(EMAIL_SUCCESS_SUFFIX, ".failed")
// .headerExpression(FileHeaders.ORIGINAL_FILE, "payload.failedMessage.headers['"
// + FileHeaders.ORIGINAL_FILE + "']"))
// .<MessagingException, String>transform(p->p.getFailedMessage().getPayload()+"\n"+getStackTraceAsString(p))
.channel("toMail.input");
}
private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}
@Bean
public SessionFactory<LsEntry> ftp() throws IOException {
Resource keyFileResource = resourceLoader.getResource(privateKeyLocation);
// File keyFile = keyFileResource.getFile();
// try (BufferedReader br = new BufferedReader(new FileReader(keyFile))) {
// String line;
// while ((line = br.readLine()) != null) {
// System.out.println(line);
// }
// }
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPrivateKey(keyFileResource);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public MethodInterceptor afterMailAdvice() {
return invocation -> {
Message<?> message = (Message<?>) invocation.getArguments()[0];
MessageHeaders headers = message.getHeaders();
File originalFile = headers.get(FileHeaders.ORIGINAL_FILE, File.class);
try {
invocation.proceed();
originalFile.renameTo(new File(originalFile.getAbsolutePath() + headers.get(EMAIL_SUCCESS_SUFFIX)));
}
catch(Exception e) {
originalFile.renameTo(new File(originalFile.getAbsolutePath() + headers.get(EMAIL_SUCCESS_SUFFIX)+"email.failed"));
}
return null;
};
}
}
application-dev.yml
spring:
application:
name: daily-cash-report-application
sftp:
in:
host: eafdev
port: 22
user: eafdev
remoteDir: tmp/in
tmpDir:
localDir: tmp/out
maxFetchSize: 1
privateKey: file:///C:/Users/x12345/AppData/Roaming/SSH/UserKeys/distributessh
chmod: 664
poller:
fixedDelay: 10000
file:
filter: A.B.* #TODO: calibrate once spec is known
out:
host: eafdev
port: 22
user: eafdev
remoteDir: tmp/out
privateKey: file:///C:/Users/x12345/AppData/Roaming/SSH/UserKeys/distributessh
chmod: 664
file:
out:
targetDir:
tmpDir:
DailyCashReportApplicationTest.java
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@SpringIntegrationTest(noAutoStartup = "fileInboundChannelAdapter")
@SpringJUnitConfig(classes = {
DailyCashReportApplication.class,
DcrConfig.class,
})
@EnableConfigurationProperties
@PropertySource(value = "application-dev.yml", factory = YamlPropertySourceFactory.class)
@Slf4j
class DailyCashReportApplicationTest {
@Test
@SneakyThrows
void test() {
File in = new File("tmp/in/", "foo");
FileOutputStream fos = new FileOutputStream(in);
fos.write("*002,foo,bar\n*006,baz,qux\n*009,fiz,buz\n".getBytes());
fos.close();
in.renameTo(new File("tmp/in/", "foo.txt"));
File out = new File("tmp/out/002.txt");
int n = 0;
while (n++ < 100 && (!out.exists() || out.length() < 12)) {
Thread.sleep(100);
}
assertThat(out.exists()).isTrue();
}
}
上述测试失败,因为入站文件通道适配器似乎没有拾取放入
tmp/in
目录中的消息,因此 tmp/out
文件夹为空。
缺少什么?有没有办法附加一些日志记录功能,以便更清楚地查看消息传递基础设施中发生的情况?
也许测试中的以下行是禁用所需行为的罪魁祸首:
@SpringIntegrationTest(noAutoStartup = "fileInboundChannelAdapter")
不知道为什么它在那里,但将其注释掉会导致以下错误:
Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'tmp' cannot be found on object of type 'org.springframework.messaging.support.GenericMessage' - maybe not public or not valid?
at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:223)
更新: 看起来 SPeL 表达式中的那些勾号很重要,更改为以下内容似乎可以使其通过管道发送文件:
@Bean
public FileWritingMessageHandlerSpec fileOut() {
return Files.outboundAdapter("'"+localDir+"'")
.appendNewLine(true)
.fileNameExpression("payload.substring(1,4) + '.txt'");
}
另外,变压器中以下表达式的解释是什么:
"'tmp/out/.*\\.txt'"
听起来像是该特定目录中所有文本文件的通配符,但为什么需要所有这些额外的点和斜杠?一些文档中有详细描述吗?
谢谢。
/**
* Create a {@link FileWritingMessageHandlerSpec} builder for the one-way {@code FileWritingMessageHandler}.
* @param directoryExpression the SpEL expression to evaluate target directory for writing files.
* @return the {@link FileWritingMessageHandlerSpec} instance.
*/
public static FileWritingMessageHandlerSpec outboundAdapter(String directoryExpression) {
因此,该参数被解析为 SpEL 表达式,并且由于您依赖于目录名称的文字,因此必须将其引用。您正在谈论的那部分代码:
// first trigger file flushes
.subscribe(sf -> sf.transform("'/tmp/out/.*\\.txt'", e -> e.id("toTriggerPattern"))
.trigger("fileOut", e -> e.id("flusher")))
用于在 MessageTriggerAction
上实现的
FileWritingMessageHandler
。另请参阅
trigger()
和
FileWritingMessageHandler
Javadocs 和文档:https://docs.spring.io/spring-integration/reference/file/writing.html#file-flushing 本质上,我们需要来自变压器的模式,用于
DefaultFlushPredicate
中的
FileWritingMessageHandler
。