从多个客户端接收消息到单个服务器寻求建议

问题描述 投票:0回答:0

我正在使用 Netty v4.1.90.Final 开发客户端-服务器应用程序。我对框架还是很陌生,对 Java 和编程也比较陌生。

客户端由一个模拟新闻提要组成,它生成一个包含一个 int 和一个字符串的 POJO。模拟新闻提要通过保持活动的 TCP 连接连接到服务器,并定期发送 POJO。客户端将发送一个启动对象,以便从服务器接收“200”状态代码响应,之后它将开始发送对象供服务器使用。

服务器端包含一个新闻分析器,它将接受 POJO 并针对任意条件集分析字符串和整数。服务器收到启动对象,将其丢弃,并以“200”状态代码进行响应。当一个对象被接收到时,它被放入一个队列中(在这个例子中是一个

LinkedBlockingQueue
。新闻项目分析器对象然后弹出队列中的下一个对象来分析它。

注意: 我在同一台机器上同时运行客户端和服务器,并连接到端口 9001 上的本地主机。

预期结果:

当我运行 2 个或更多客户端实例时,我希望传入的消息将由 Netty 接收并以异步方式传递给我的解码器和

NewsAnalyserHandler
(见下文)。当它们到达
NewsAnalyserHandler
时,我希望解码后的消息将按照 Netty 接收它们的顺序附加到消息队列中,并且队列大小将按比例增加。我希望对象不会被覆盖,因为它被附加到队列中。

实际结果:

当解码的消息到达

NewsAnalyserHandler
时,它们似乎会相互覆盖,具体取决于接收它们的顺序。因此,如果我从 3 个客户端收到 20 条消息,我的队列中最终将有 20 条消息,而不是我期望的 3x20 条消息。

可以在 News Analyser log 的日志中看到这种覆盖行为。

NewsAnalyserHandler

public class NewsAnalyserHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = LoggerFactory.getLogger(NewsAnalyserHandler.class);
    private static final String OK_TO_SEND = "200";
    private static final String STOP_SENDING = "507";

    private final BlockingQueue<NewsItem> messageQueue = new LinkedBlockingQueue<>();

    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        NewsItem request = (GeneratedNewsItem) msg;

        // If the NewsItem is the initiating object.
        if (request.getHeadline().equals("INIT") && request.getPriorty() == -1) {
            ctx.write(OK_TO_SEND);
            return;
        }

        // If the NewsItem can be inserted into the queue
        if (messageQueue.offer(request)) {
            logger.info("Received news item: {}", request);
            logger.info("Sending 200 response");
            ctx.write(OK_TO_SEND);
        }

        logger.info("Number of messages: {}", messageQueue.size());
        ReferenceCountUtil.release(msg);

    }

    //...

}

我使用的2个解码器:

NewsItemByteDecoder

public class NewsItemByteDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        while (in.isReadable()) {
            int strLen = in.readInt();
            byte[] headlineBytes = new byte[strLen];
            in.readBytes(headlineBytes);
            String headline = new String(headlineBytes, StandardCharsets.UTF_8);
            int priority = in.readInt();
            NewsItem ni = GeneratedNewsItem.createNewsItem(priority, headline);
            ctx.fireChannelRead(ni);
        }
    }

}

NewsItemDecoder

public class NewsItemDecoder extends ReplayingDecoder<NewsItem> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int strLen = in.readInt();
        byte[] headlineBytes = new byte[strLen];
        in.readBytes(headlineBytes);
        String headline = new String(headlineBytes, StandardCharsets.UTF_8);
        int priority = in.readInt();
        NewsItem ni = GeneratedNewsItem.createNewsItem(priority, headline);
        out.add(ni);
    }
    
}

我的bootstrapping如下:

NewsAnalyser
引导课程

public final class NewsAnalyser {

    private static final int DEFAULT_PORT = 9001;
    private static final Logger logger = LoggerFactory.getLogger(NewsAnalyser.class);

    private int port;

    public NewsAnalyser(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws Exception {
        int port = (args.length > 0) ? Integer.parseInt(args[0]) : DEFAULT_PORT;

        new NewsAnalyser(port).run();

    }

    public void run() throws InterruptedException {

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NewsItemByteDecoder())
                                         .addLast(new ServerResponseEncoder(),
                                                  new NewsAnalyserHandler());
                        }

                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture future = b.bind(port).sync();
            logger.info("Starting News Analyser on localhost:{}", port);
            future.channel().closeFuture().sync();
        } finally {
            logger.info("Shutting down News Analyser on localhost:{}", port);
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

我试过同时使用

ReplayingDecoder<T>
ByteToMessageDecoder
,效果完全一样。

我还尝试使用多种不同的数据结构,包括

List
BlockingQueue
concurrentLinkedQueue
。我还尝试将
synchronized
关键字添加到
channelRead()
方法中,并且我尝试使用
ReentrantLock
执行方法主体,但无济于事。 所有这些努力告诉我,我并没有从根本上理解 Netty 如何接收和处理 TCP 流量的概念。或者很可能,我不理解 Java 中关于队列或同步的概念。

任何让这个应用程序按预期工作的建议,甚至是指向可能类似帖子的指针,都将不胜感激。

java queue netty
© www.soinside.com 2019 - 2024. All rights reserved.