我查看了
spring-kafka
文档,但没有提到将 @KafkaListener
与 @Async
一起使用,考虑到虚拟线程。
例如,在 Spring Boot 的配置中我们设置:
spring.threads.virtual.enabled = true
然后我们会有一个用 @KafkaListener 和 @Async 注解的方法:
@Async
@KafkaListener(topics = "some-topic")
public void consume(ConsumerRecord<String, String> record) {
//code here
}
考虑到单个分区和单个消费者且没有批处理的简单场景,我注意到消耗的每个记录都由不同的虚拟线程消耗。
当我们删除
@Async
时,它会变成“正常”@KafkaListener
行为,单个线程将按顺序消耗所有记录。
现在,我完全意识到在这种情况下
@Async
会阻止记录按顺序被消耗,但除此之外,这样做还有什么陷阱吗?或者有什么好处?
我只是想知道这是否更具可扩展性,搭载虚拟线程来消费每条消息,而不是通常每个消费者一个线程。
由于 spring-kafka 的文档中没有提到虚拟线程,我想知道使用
@Async
在这种情况下是否是一件危险的事情。
有人尝试过这个吗?干杯。
虽然与 Kafka 没有直接关系,但下面的一个非常基本的骨架 Executor 可能允许您在平台、虚拟和线程内模式之间切换,看看什么最适合您:
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
@Component
public class ThreadingModeExecutor implements TaskExecutor {
public enum ThreadingMode {
SAME_THREAD,
PLATFORM,
VIRTUAL
}
private volatile ThreadingMode threadingMode = ThreadingMode.SAME_THREAD;
private final SimpleAsyncTaskExecutor platformExecutor;
private final SimpleAsyncTaskExecutor virtualExecutor;
public ThreadingModeExecutor() {
platformExecutor = new SimpleAsyncTaskExecutor();
virtualExecutor = new SimpleAsyncTaskExecutor();
virtualExecutor.setVirtualThreads(true);
}
public ThreadingModeExecutor(ThreadingMode threadingMode) {
this();
this.threadingMode = threadingMode;
}
@Override
public void execute(Runnable task) {
switch (threadingMode) {
case ThreadingMode.SAME_THREAD:
task.run();
break;
case ThreadingMode.PLATFORM:
platformExecutor.execute(task);
break;
case ThreadingMode.VIRTUAL:
virtualExecutor.execute(task);
}
}
public ThreadingMode getThreadingMode() {
return threadingMode;
}
public void setThreadingMode(ThreadingMode threadingMode) {
this.threadingMode = threadingMode;
}
}
用
@Async
注释的方法将如下所示:
@Async("threadingModeExecutor")
public void seeCastellan() {
// ...
}
请注意,在这种情况下,Spring Boot
spring.threads.virtual.enabled
被完全忽略,因为 Executor Spring bean 是“手动”创建的。这也意味着该设计可以用于非 Spring Boot、普通 Spring Bean/Context 应用程序,例如基于战争的; @Async
当然,必须正确配置用法。
该片段假设
ThreadingModeExecutor
类位于扫描路径中,例如通过 @ComponentScan
。或者,可以在 @Configuration
类中创建相应的 Spring bean。但是,Spring bean 名称必须与 value
注释的 @Async
参数(代码片段中的 threadingModeExecutor
)一致。
请注意,在此代码片段中,
SimpleAsyncTaskExecutor
不池化线程,这可能是平台线程的问题。