带有虚拟线程和@Async的Spring kafka

问题描述 投票:0回答:1

我查看了

spring-kafka
文档,但没有提到将
@KafkaListener
@Async
一起使用,考虑到虚拟线程。

例如,在 Spring Boot 的配置中我们设置:

spring.threads.virtual.enabled = true

然后我们会有一个用 @KafkaListener 和 @Async 注解的方法:

  @Async
  @KafkaListener(topics = "some-topic")
  public void consume(ConsumerRecord<String, String> record) {
      //code here
  }

考虑到单个分区和单个消费者且没有批处理的简单场景,我注意到消耗的每个记录都由不同的虚拟线程消耗。

当我们删除

@Async
时,它会变成“正常”
@KafkaListener
行为,单个线程将按顺序消耗所有记录。

现在,我完全意识到在这种情况下

@Async
会阻止记录按顺序被消耗,但除此之外,这样做还有什么陷阱吗?或者有什么好处?

我只是想知道这是否更具可扩展性,搭载虚拟线程来消费每条消息,而不是通常每个消费者一个线程。

由于 spring-kafka 的文档中没有提到虚拟线程,我想知道使用

@Async
在这种情况下是否是一件危险的事情。

有人尝试过这个吗?干杯。

java spring-boot apache-kafka spring-kafka virtual-threads
1个回答
0
投票

虽然与 Kafka 没有直接关系,但下面的一个非常基本的骨架 Executor 可能允许您在平台、虚拟和线程内模式之间切换,看看什么最适合您:

import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;

@Component
public class ThreadingModeExecutor implements TaskExecutor {

    public enum ThreadingMode {
        SAME_THREAD,
        PLATFORM,
        VIRTUAL
    }

    private volatile ThreadingMode threadingMode = ThreadingMode.SAME_THREAD;
    private final SimpleAsyncTaskExecutor platformExecutor;
    private final SimpleAsyncTaskExecutor virtualExecutor;

    public ThreadingModeExecutor() {
        platformExecutor = new SimpleAsyncTaskExecutor();
        virtualExecutor = new SimpleAsyncTaskExecutor();
        virtualExecutor.setVirtualThreads(true);
    }

    public ThreadingModeExecutor(ThreadingMode threadingMode) {
        this();
        this.threadingMode = threadingMode;
    }

    @Override
    public void execute(Runnable task) {
        switch (threadingMode) {
        case ThreadingMode.SAME_THREAD:
            task.run();
            break;
        case ThreadingMode.PLATFORM:
            platformExecutor.execute(task);
            break;
        case ThreadingMode.VIRTUAL:
            virtualExecutor.execute(task);
        }
    }

    public ThreadingMode getThreadingMode() {
        return threadingMode;
    }

    public void setThreadingMode(ThreadingMode threadingMode) {
        this.threadingMode = threadingMode;
    }

}

@Async
注释的方法将如下所示:

@Async("threadingModeExecutor")
public void seeCastellan() {
    // ...
}

请注意,在这种情况下,Spring Boot

spring.threads.virtual.enabled
被完全忽略,因为 Executor Spring bean 是“手动”创建的。这也意味着该设计可以用于非 Spring Boot、普通 Spring Bean/Context 应用程序,例如基于战争的;
@Async
当然,必须正确配置用法。

该片段假设

ThreadingModeExecutor
类位于扫描路径中,例如通过
@ComponentScan
。或者,可以在
@Configuration
类中创建相应的 Spring bean。但是,Spring bean 名称必须与
value
注释的
@Async
参数(代码片段中的
threadingModeExecutor
)一致。

请注意,在此代码片段中,

SimpleAsyncTaskExecutor
不池化线程,这可能是平台线程的问题。

© www.soinside.com 2019 - 2024. All rights reserved.