Java 21 虚拟线程似乎会阻止外部服务调用上的载体线程

问题描述 投票:0回答:1

我有一个 Spring Boot 微服务,它使用来自 RabbitMQ 的消息、撰写电子邮件并将其发送到 SMTP 服务器。 它由以下组件组成:

  1. 撰写电子邮件端的电子邮件发送者将其发送到 SMTP 服务器:
@Service
@RequiredArgsConstructor
public class MessageSender {

    private final JavaMailSender sender;

    public void sendMessage(final RabbitEmailDto emailDto) {
        MimeMessage message = sender.createMimeMessage();
        message.setRecipients(Message.RecipientType.TO, emailDto.getTo());

        MimeMessageHelper helper = new MimeMessageHelper(message, CharEncoding.UTF_8);
        helper.setSubject(emailDto.getData().getEmail().getSubject());
        helper.setText(emailDto.getHtml(), true);
        helper.setFrom(emailDto.getFrom());

        sender.send(message);
    }
}
  1. 消息处理器获取 RabbitMQ 消息列表,每个消息在单独的虚拟线程中调用消息发送者,并返回电子邮件发送结果的 futures 列表
@Service
@RequiredArgsConstructor
public class MessageProcessor {

    private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
    private final MessageSender messageSender;
    private final Jackson2JsonMessageConverter converter;

    public List<MessageProcessingFuture> processMessages(List<Message> messages) {
        List<MessageProcessingFuture> processingFutures = new ArrayList<>();

        for (Message message : messages) {
            MessageProcessingFuture messageProcessingFuture = new MessageProcessingFuture(
                    message.getMessageProperties().getDeliveryTag(),
                    processMessageAsync(message, executor)
            );
            processingFutures.add(messageProcessingFuture);
        }

        return processingFutures;
    }

    private Future<?> processMessageAsync(final Message message) {
        RabbitEmailDto rabbitEmailDto = (RabbitEmailDto) converter.fromMessage(message);
        MessageDeliverySystemEmailDto email = rabbitEmailDto.getData().getEmail();

        return executor.submit(() -> messageSender.sendMessage(rabbitEmailDto));
    }
}
  1. RabbitMQ 消息监听器,它使用来自 Rabbit 队列的消息,将它们传递给处理器,然后根据 Future.get() 是否抛出异常或执行操作,通过向 RabbitMQ 发送确认或拒绝来处理从处理器获得的 futures不是。
@Component
@RequiredArgsConstructor
public class BatchMessageListener implements ChannelAwareBatchMessageListener {

    private final MessageProcessor messageProcessor;

    @Override
    @MeasureExecutionTime
    public void onMessageBatch(final List<Message> messages, final Channel channel) {
        messageProcessor.processMessages(messages)
                .forEach(processingFuture -> processFuture(processingFuture, channel));
    }

    private void processFuture(final MessageProcessingFuture future, final Channel channel) {
        try {
            future.deliveryFuture().get();
            channel.basicAck(future.deliveryTag(), false);
        } catch (Exception e) {
            channel.basicReject(future.deliveryTag(), false);
        }
    }
}

我可以在日志中看到

MessageSender.sendMessage
方法确实在虚拟线程中执行,标识为
VirtualThread[#100]/runnable@ForkJoinPool-1-worker-1

我可以看到我们的生产服务器上有 4 个工作人员。 (我是否正确地认为这些工作人员是实际的平台线程或载体线程?)

我还发现,

MessageSender.sendMessage
方法通常需要大约 1 秒才能完成,其中大部分时间都花在等待 SMTP 服务器的响应上。

根据我对虚拟线程的了解,我预计处理一批 100 条消息(这是我为

BatchMessageListener
配置的批量大小)将花费大约 1 秒,因为平台线程不会阻塞对SMTP 服务器。这 4 个平台线程将在 100 个虚拟线程之间共享,从而有效地允许 100 个几乎同时调用 SMTP 服务器。

但是,在实践中,我观察到一次处理 4 条消息,处理所有 100 条消息大约需要 25 秒。

在我的计算机上进行本地测试时,我故意在

Thread.sleep(1000);
中的
sender.send(message);
行之前添加
MessageSender
来引入 1 秒延迟,以模拟网络延迟。当时,一批 100 条消息确实在大约 1 秒内处理完毕,尽管根据日志我只有 10 个承载线程。

我很困惑。为什么运营商线程不会阻塞

Thead.sleep
调用,而是阻塞外部服务的调用?我做错了什么吗?

java performance blocking virtual-threads
1个回答
0
投票

固定

当虚拟线程未从其承载线程中卸下时,听起来就像固定。有关如何检测固定的指南,请参阅 Todd Ginsberg 撰写的文章 Java Virtual Thread Pinning。他描述了如何通过 (1) 日志记录和 (2) 使用 JDK Flight Recorder (JFR)(可通过 JDK Mission Control (JMC) 查看)来检测固定。

固定通常是由于:

  • 在长时间运行的代码周围使用同步。
  • 通过JNI等调用本机代码

您后来评论:

我看到我使用的 org.springframework.mail.javamail.JavaMailSender 在发送电子邮件时调用了多个同步方法。在这种情况下,没有什么可以从虚拟步骤中受益,我的理解是否正确?

正确。

虚拟线程不适合涉及长时间运行的代码(

synchronized
)或本机(JNI 等)的任务。在这两种情况下,JVM 的虚拟线程调度程序都无法看到此类代码何时被阻塞,因此它仍然分配给平台载体线程。

对于偶尔遇到的情况来说,这种情况没什么大不了的。但对于重复或持续的遭遇,这种情况意味着您将从虚拟线程中获得很少或根本没有好处,并且虚拟线程实际上可能会损害整体性能。

Project Loom 团队正在继续尝试寻找解决

synchronized
限制的方法,但在 Java 21 和 22 中尚未取得进展。

synchronized
替换为
ReentrantLock

如果

synchronized
代码是您自己的,则将任何长时间运行的
synchronized
替换为
ReentrantLock
,以通过虚拟线程重新获得效率。

有些人将此指导误解为“将 synchronized

all
的使用替换为
ReentrantLock
”。这是没有必要的。 仅需要修改长时间运行的
synchronized
代码。
简短的代码(例如对通常可用的变量的保护访问)可以保留在适当的位置,因为在
synchronized
代码中花费的时间很少。

如果无法修改长时间运行的

synchronized
任务的源代码,则使用平台线程而不是虚拟线程。但请记住,如果这些任务具有多个可以多线程的子任务,则平台线程中的任务可能会受益于使用虚拟线程运行这些子任务。

© www.soinside.com 2019 - 2024. All rights reserved.