我在同一台机器上启动 2 个
Channel
实例:第一个实例发送到第二个实例,反之亦然。
然后我执行以下操作:
channel2.pipeline().addAfter(OutboundEncoder.NAME, OutgoingAcknowledgementSilencer.NAME, ackSilencer);
channel1.writeAndFlush(message);
此代码在
main
线程中执行(所以肯定不是 Netty 线程)。
大多数时候,这就像我期望的那样工作:当第二行发送的消息到达时,接收管道(
channel2
)的行为就像添加了ackSilencer
一样。但极少数情况下,我观察到 channel2
的管道不会调用第 1 行添加的处理程序。
https://netty.io/4.0/api/io/netty/channel/ChannelPipeline.html
的
ChannelPipeline
的Javadoc说如下:
ChannelHandler 可以随时添加或删除,因为 ChannelPipeline 是线程安全的。例如,您可以在即将交换敏感信息时插入加密处理程序,并在交换后将其删除。但是,当从另一个线程(不是事件循环线程)完成此类插入时,是否可以保证具有“立即”效果?
我将以下内容添加到处理程序中:
final CountDownLatch addedLatch = new CountDownLatch(1);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
addedLatch.countDown();
}
然后我还添加了一个等待调用的方法:
channel2.pipeline().addAfter(OutboundEncoder.NAME, OutgoingAcknowledgementSilencer.NAME, ackSilencer);
ackSilencer.addedLatch.await(1, SECONDS);
channel1.writeAndFlush(message);
这似乎有效,因为我们明确等待添加发生。另一种有效的方法是在相应的 evnet 循环中显式添加处理程序,然后使用
CountDownLatch
CountDownLatch latch = new CountDownLatch(1)
channel2.eventLoop().execute(() -> {
channel2.pipeline().addAfter(OutboundEncoder.NAME, OutgoingAcknowledgementSilencer.NAME, ackSilencer);
latch.countDown();
});
latch.await(1, SECONDS);
channel1.writeAndFlush(message);
但这似乎是 Netty 本身所做的手工操作。