我有一个 Spring 集成流程,其中我目前使用 Thread.sleep() 引入了手动条件延迟
我知道这是低效的,现在想使用 spring 集成原生重构整个流程延迟支持
但是我当前的应用程序是单线程的,因此有很多监视/跟踪库在 threadcontext/mdc 中添加其上下文
IntegrationFlow.from("input")
.handle((message, h) -> {
// initial logic which may set thread context
})
.handle((message, h) -> {
// conditional delay logic invoking Thread.delay()
})
.handle((message, h) -> {
// subsequent logic which may rely on the thread context
})
.channel("output")
.get();
通道就是这样定义的
@Bean(name = "input")
public MessageChannel getChannel() {
DirectChannel dc = new DirectChannel();
dc.setComponentName("input");
dc.setDatatypes(ConsumerRecord.class);
return dc;
}
即使是向该通道发送消息的线程也可以在 mdc 中设置某些内容;这是有效的,因为我正在使用
DirectChannel
并且根据我对 DirectChannel
的理解,流程的执行仅发生在调用线程上。
如果我将其重构为类似于以下代码的内容,它会起作用吗
IntegrationFlow.from("input")
.handle((message, h) -> {
// initial logic which may set thread context
})
.delay(d -> {// some delay config})
.handle((message, h) -> {
// subsequent logic which may rely on the thread context
})
.channel("output")
.get();
我的预感是,当消息进入延迟通道时,上下文就会丢失,并且下次分配给该任务的线程将不会记住上下文,因此我的后续逻辑将失败。
在弹簧集成领域实现这一目标的正确方法是什么? 预先感谢。
是的。
DelayHandler
使用 TaskScheduler
通过计划任务延迟这些消息。线程局部变量在计划线程中丢失并不奇怪。
传播线程本地上下文(在您的情况下是 MDC)的最简单方法之一是将其设置到消息头中,然后在延迟器之后的下一步中将其恢复回线程本地。
另一种方法是实现类似于
TaskScheduler
的 DelegatingSecurityContextTaskScheduler
外观,并将其注入到 delay()
函数中。
更新
以下是如何实现 MDC 传播的一些想法:https://moelholm.com/blog/2017/07/24/spring-43-using-a-taskdecorator-to-copy-mdc-data-to-async -线程
这里讨论:如何将 MDC 与线程池一起使用?