如何设置UDP webflux服务器的线程数?

问题描述 投票:0回答:1

我正在 Spring Boot 2.1.3 中使用 Webflux 2.1.8 构建一个 UDP 服务器,它从 UDP 客户端收集数据。

我希望它以完全响应式的方式构建,但我面临着线程池的问题。我的应用程序目前仅在一个线程上运行,这不是我所期望的。首先,我构建了一个简单的 python 客户端,它等待响应来测试我的服务器

# Create a UDP socket at client side
UDPClientSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
# Send to server using created UDP socket

UDPClientSocket.sendto(bytesToSend, serverAddressPort)
print("Receiving...")
msgFromServer = UDPClientSocket.recvfrom(bufferSize)
msg = "Message from Server {}".format(msgFromServer[0])

print(msg)

现在我的 Spring boot UDP 服务器看起来像这样:

@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    CommandLineRunner serverRunner(UdpDecoderHandler udpDecoderHanlder, UdpEncoderHandler udpEncoderHandler, UdpHandler udpHandler) {
        return strings -> {
            createUdpServer(udpDecoderHanlder, udpEncoderHandler, udpHandler);
        };
    }

    private void createUdpServer(UdpDecoderHandler udpDecoderHandler, UdpEncoderHandler udpEncoderHandler, UdpHandler udpHandler) {
        UdpServer.create()
                .handle((in,out) -> {
                    in.receive().asByteArray().subscribe();
                    return Flux.never();
                })
                .port(19001)
                .doOnBound(conn -> conn
                        .addHandler("decoder", udpDecoderHandler)
                        .addHandler("encoder", udpEncoderHandler)
                        .addHandler("handler", udpHandler)
                )
                .bindNow(Duration.ofSeconds(30));
    }
}

解码器:

@Service
public class UdpDecoderHandler extends MessageToMessageDecoder<DatagramPacket>  {

    private static final Logger LOGGER = LoggerFactory.getLogger(UdpDecoderHandler.class);

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List<Object> out) {
        ByteBuf byteBuf = datagramPacket.content();
        byte[] data = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(data);
        String msg = new String(data);
        LOGGER.info("Decoded data: " + msg);

        InetSocketAddress s = datagramPacket.sender();
        channelHandlerContext.fireChannelRead(s);
    }
}

编码器:

@Service
public class UdpEncoderHandler extends MessageToMessageEncoder {

    @Override
    protected void encode(ChannelHandlerContext ctx, Object o, List list) throws Exception {
        InetSocketAddress socketAddress = (InetSocketAddress) o;
        log.debug("Encode function...");
        String msg = "Hello response!";
        DatagramPacket response = new DatagramPacket(Unpooled.copiedBuffer(msg.getBytes()), socketAddress);
        list.add(response);
    }
}

Handler(这里我添加了睡眠功能来测试我的并发性):

@Service
public class UdpHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("Channel Read function");
        InetSocketAddress message = (InetSocketAddress) msg;

        Thread.sleep(5000);

        ctx.channel().writeAndFlush(message).addListener((GenericFutureListener<Future<Void>>) future -> {
            if(future.isDone() && future.isSuccess()) {
                log.info("OK");
            } else {
                log.error("error " + future.isDone() + " - " + future.isSuccess());
                if(!future.isSuccess()) {
                    future.cause().printStackTrace();
                }
            }
        });
    }
}

有了这样的东西,我就立即运行了我的 python 脚本:

python3 udp-client.py & python3 udp-client.py

会弹出两条消息,相差 5 秒,这意味着我的应用程序在一个线程上运行。

如何自定义 netty 和 webflux 以使用 4 个线程?

spring spring-boot netty spring-webflux
1个回答
0
投票

也许这会对您有所帮助:

UdpServer.create()
        .runOn(new DefaultEventLoopGroup(5))
© www.soinside.com 2019 - 2024. All rights reserved.