如何在Netty通道处理程序中安全地执行阻塞操作?

问题描述 投票:4回答:1

我正在构建一个基于Netty的小应用程序,它通过套接字连接(即telnet / ssh)执行I / O操作。我正在使用Netty的ServerBootstrap类启动我的套接字服务器,给它:

  1. 类型为NioEventLoopGroup的事件循环(即不应进行阻塞操作的共享线程池)。
  2. NioServerSocketChannel类型的渠道(我认为这需要与上面的#1相对应)。
  3. 一个非常简单的管道,带有扩展ChannelInboundHandlerAdapter的通道处理程序。

每当从客户端套接字连接接收到命令字符串时,都会调用我的处理程序的channelRead(...)方法,并根据命令返回一些响应字符串。

对于不涉及阻塞操作的命令,一切都很好。但是,有一些命令我现在需要从中读取或写入数据库。那些JDBC调用本身就会阻塞...虽然我可以使用CompletableFuture(或其他)在一个单独的线程中处理它们。

但即使我通过在单独的线程中执行阻塞操作来“滚动我自己的异步”,我也不确定如何将这些生成的线程的结果重新连接回主线程中的Netty通道处理程序。

我看到ChannelHandlerContext类有以下方法:

ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

...作为我目前使用的替代品:

ChannelFuture writeAndFlush(Object msg);

但是我找不到任何文档或指导(甚至是有用的Javadocs)来解释如何在这个用例中使用这个ChannelPromise类型。它的名字暗示它可能是相关的,但它可能不是。毕竟,writeAndFlush方法仍然将传出消息作为其第一个参数...所以,如果您需要将其结果存储在“promise”第二个参数中,那么它有什么用呢?第一个参数?

这里的正确轨道是什么?有没有办法在单独的线程中处理阻塞操作,以便Netty的NioEventLoopGroup不会阻塞?或者这根本不是Netty的工作方式,如果你需要支持阻塞,你应该使用不同的事件循环实现(即为每个客户端套接字连接生成一个单独的线程)。

java multithreading asynchronous netty nio
1个回答
4
投票

如果Netty中的操作需要更长的时间才能完成或阻塞,建议在handler that uses a separate ExecutorGroup中执行该操作,以便不阻止主EventLoop线程。

您可以在管道创建期间指定。

引用一个使用执行程序组进行数据库操作的示例来自ChannelPipeline javadoc

static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
 ...

 ChannelPipeline pipeline = ch.pipeline();

 pipeline.addLast("decoder", new MyProtocolDecoder());
 pipeline.addLast("encoder", new MyProtocolEncoder());

 // Tell the pipeline to run MyBusinessLogicHandler's event handler methods
 // in a different thread than an I/O thread so that the I/O thread is not blocked by
 // a time-consuming task.
 // If your business logic is fully asynchronous or finished very quickly, you don't
 // need to specify a group.
 pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
© www.soinside.com 2019 - 2024. All rights reserved.