我有一个 Spring Boot 微服务,它使用来自 RabbitMQ 的消息、撰写电子邮件并将其发送到 SMTP 服务器。 它由以下组件组成:
@Service
@RequiredArgsConstructor
public class MessageSender {
private final JavaMailSender sender;
public void sendMessage(final RabbitEmailDto emailDto) {
MimeMessage message = sender.createMimeMessage();
message.setRecipients(Message.RecipientType.TO, emailDto.getTo());
MimeMessageHelper helper = new MimeMessageHelper(message, CharEncoding.UTF_8);
helper.setSubject(emailDto.getData().getEmail().getSubject());
helper.setText(emailDto.getHtml(), true);
helper.setFrom(emailDto.getFrom());
sender.send(message);
}
}
@Service
@RequiredArgsConstructor
public class MessageProcessor {
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
private final MessageSender messageSender;
private final Jackson2JsonMessageConverter converter;
public List<MessageProcessingFuture> processMessages(List<Message> messages) {
List<MessageProcessingFuture> processingFutures = new ArrayList<>();
for (Message message : messages) {
MessageProcessingFuture messageProcessingFuture = new MessageProcessingFuture(
message.getMessageProperties().getDeliveryTag(),
processMessageAsync(message, executor)
);
processingFutures.add(messageProcessingFuture);
}
return processingFutures;
}
private Future<?> processMessageAsync(final Message message) {
RabbitEmailDto rabbitEmailDto = (RabbitEmailDto) converter.fromMessage(message);
MessageDeliverySystemEmailDto email = rabbitEmailDto.getData().getEmail();
return executor.submit(() -> messageSender.sendMessage(rabbitEmailDto));
}
}
@Component
@RequiredArgsConstructor
public class BatchMessageListener implements ChannelAwareBatchMessageListener {
private final MessageProcessor messageProcessor;
@Override
@MeasureExecutionTime
public void onMessageBatch(final List<Message> messages, final Channel channel) {
messageProcessor.processMessages(messages)
.forEach(processingFuture -> processFuture(processingFuture, channel));
}
private void processFuture(final MessageProcessingFuture future, final Channel channel) {
try {
future.deliveryFuture().get();
channel.basicAck(future.deliveryTag(), false);
} catch (Exception e) {
channel.basicReject(future.deliveryTag(), false);
}
}
}
我可以在日志中看到
MessageSender.sendMessage
方法确实在虚拟线程中执行,标识为 VirtualThread[#100]/runnable@ForkJoinPool-1-worker-1
。
我可以看到我们的生产服务器上有 4 个工作人员。 (我是否正确地认为这些工作人员是实际的平台线程或载体线程?)
我还发现,
MessageSender.sendMessage
方法通常需要大约 1 秒才能完成,其中大部分时间都花在等待 SMTP 服务器的响应上。
根据我对虚拟线程的了解,我预计处理一批 100 条消息(这是我为
BatchMessageListener
配置的批量大小)将花费大约 1 秒,因为平台线程不会阻塞对SMTP 服务器。这 4 个平台线程将在 100 个虚拟线程之间共享,从而有效地允许 100 个几乎同时调用 SMTP 服务器。
但是,在实践中,我观察到一次处理 4 条消息,处理所有 100 条消息大约需要 25 秒。
在我的计算机上进行本地测试时,我故意在
Thread.sleep(1000);
中的 sender.send(message);
行之前添加 MessageSender
来引入 1 秒延迟,以模拟网络延迟。当时,一批 100 条消息确实在大约 1 秒内处理完毕,尽管根据日志我只有 10 个承载线程。
我很困惑。为什么运营商线程不会阻塞
Thead.sleep
调用,而是阻塞外部服务的调用?我做错了什么吗?
当虚拟线程未从其承载线程中卸下时,听起来就像固定。有关如何检测固定的指南,请参阅 Todd Ginsberg 撰写的文章 Java Virtual Thread Pinning。他描述了如何通过 (1) 日志记录和 (2) 使用 JDK Flight Recorder (JFR)(可通过 JDK Mission Control (JMC) 查看)来检测固定。
固定通常是由于:
您后来评论:
我看到我使用的 org.springframework.mail.javamail.JavaMailSender 在发送电子邮件时调用了多个同步方法。在这种情况下,没有什么可以从虚拟步骤中受益,我的理解是否正确?
正确。
虚拟线程不适合涉及长时间运行的代码(
synchronized
)或本机(JNI 等)的任务。在这两种情况下,JVM 的虚拟线程调度程序都无法看到此类代码何时被阻塞,因此它仍然分配给平台载体线程。
对于偶尔遇到的情况来说,这种情况没什么大不了的。但对于重复或持续的遭遇,这种情况意味着您将从虚拟线程中获得很少或根本没有好处,并且虚拟线程实际上可能会损害整体性能。
Project Loom 团队正在继续尝试寻找解决
synchronized
限制的方法,但在 Java 21 和 22 中尚未取得进展。
synchronized
替换为 ReentrantLock
如果
synchronized
代码是您自己的,则将任何长时间运行的 synchronized
替换为 ReentrantLock
,以通过虚拟线程重新获得效率。
有些人将此指导误解为“将 synchronized
的
all的使用替换为
ReentrantLock
”。这是没有必要的。 仅需要修改长时间运行的 synchronized
代码。 简短的代码(例如对通常可用的变量的保护访问)可以保留在适当的位置,因为在 synchronized
代码中花费的时间很少。
如果无法修改长时间运行的
synchronized
任务的源代码,则使用平台线程而不是虚拟线程。但请记住,如果这些任务具有多个可以多线程的子任务,则平台线程中的任务可能会受益于使用虚拟线程运行这些子任务。