动机:我需要在路由到Sftp出站网关之前为DelegatingSessionFactory设置threadKey,然后取消设置threadKey。
根据租户,我需要使用不同的Sftp用户帐户。用户帐户是我的application.yml中的配置问题,我不想为每个新租户编写单独的路由。
public IntegrationFlow aDynamicSftpFlow() {
f -> f
.handle(tenantSessionDefine()) // how can I use a lambda instead?
.handle(Sftp.outboundGateway(delegatingSessionFactory, ...))
.handle(...) // undefine sftp session
}
设置threadKey需要一个Message<?>
,而不仅仅是有效负载和头文件。所以我使用bean是因为它需要一条消息:
public class TenantSessionDefine {
private DelegatingSessionFactory delegatingSessionFactory;
public TenantSessionDefine(DelegatingSessionFactory delegatingSessionFactory) {
this.delegatingSessionFactory = delegatingSessionFactory;
}
public Message<?> defineSession(Message<?> message) {
return delegatingSessionFactory.setThreadKey(message, message.getHeaders()
.get("tenantId", String.class));
// used by SessionFactoryLocator
}
}
我想把它写成一个lambda,就像在
.handle(message -> delegatingSessionFactory.setThreadKey(message,
message.getPayload().getTenant())
但那并不容易。可以与带有handle()
的Message<T>
一起使用的lambda结束流程,因为它是一个void函数(MessageHandler
功能接口)。另一个lambda是GenericHandler,它不会结束流,但它需要有效负载和头,而不是消息。
这只是一个例子,我时不时地希望我能在qdaxswpoi中使用lambda中的消息而不结束流程。我怎样才能做到这一点?
更新
handle()
不是一个特别适合的例子。由于设置和清除线程密钥应该在sftp调用之前和之后发生,因此建议比在调用之前和之后定义处理程序更合适。
得到它了。 DelegatingSessionFactory
的javadoc说
如果您需要访问整个邮件,请使用
handle()
。
Class参数必须是handle(Class, GenericHandler)
:
Message.class