根据其他地方的建议,我试图在Netty管道中并行化我的最终入站处理程序
public final class EchoServer {
private EventLoopGroup group = new NioEventLoopGroup();
private UnorderedThreadPoolEventExecutor workers = new UnorderedThreadPoolEventExecutor(10);
public void start(int port) throws InterruptedException {
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel channel) throws Exception {
channel.pipeline().addLast(workers, new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
System.err.println(packet);
// Simulated database delay that I have to wait to occur before repsonding
Thread.sleep(1000);
ctx.write(new DatagramPacket(Unpooled.copiedBuffer("goodbye", StandardCharsets.ISO_8859_1), packet.sender()));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
}
});
}
});
b.bind(port).sync().channel().closeFuture().await();
} finally {
group.shutdownGracefully();
}
}
public void stop() {
group.shutdownGracefully();
}
}
我有十个客户端同时连接,作为测试,我正在测量处理所有请求的执行时间。正如预期的那样,1秒延迟和顺序执行只需10秒钟。我试图将执行降低到2秒以下,以证明并行处理。
根据我的理解,使用显式分配的执行程序将处理程序添加到管道应该并行化处理程序在执行程序中的线程上工作。
我发现,当我添加并行处理时,我的客户端没有收到响应,而不是看到性能的提高。线程休眠用于模拟将传入数据写入数据库所需的潜在时间。我在做一些明显不对的事吗?
我使用标准的Java并发机制来解决显然缺乏Netty支持并行进行最终端UDP处理的问题。
public final class EchoServer {
private EventLoopGroup group = new NioEventLoopGroup();
private ExecutorService executors = Executors.newFixedThreadPool(10);
public void start(int port) throws InterruptedException {
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioDatagramChannel.class).handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel channel) throws Exception {
channel.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
CompletableFuture.runAsync(() -> {
System.err.println(packet);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("goodbye", StandardCharsets.ISO_8859_1),
packet.sender()));
}, executors);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
}
});
}
});
b.bind(port).sync().channel().closeFuture().await();
} finally {
group.shutdownGracefully();
}
}
public void stop() {
group.shutdownGracefully();
}
}