如何使用netty客户端获取服务器响应

问题描述 投票:12回答:3

我想写一个基于netty的客户端。它应该有方法public String send(String msg);哪个应该从服务器或某个未来返回响应 - doesen't不重要。它也应该是多线程的。像这样:

public class Client {
public static void main(String[] args) throws InterruptedException {
    Client client = new Client();

}

private Channel channel;

public Client() throws InterruptedException {
    EventLoopGroup loopGroup = new NioEventLoopGroup();

    Bootstrap b = new Bootstrap();
    b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringDecoder()).
                    addLast(new StringEncoder()).
                    addLast(new ClientHandler());
        }
    });
    channel = b.connect("localhost", 9091).sync().channel();
}

public String sendMessage(String msg) {
    channel.writeAndFlush(msg);
    return ??????????;
}

}

我不知道如何在调用writeAndFlush()之后从服务器检索响应;我该怎么办?

我也使用Netty 4.0.18.Final

java client-server netty
3个回答
12
投票

为方法返回Future<String>很简单,我们将实现以下方法签名:

public Futute<String> sendMessage(String msg) {

当您熟悉异步编程结构时,相对容易做到。为了解决设计问题,我们将执行以下步骤:

  1. 写入消息时,将Promise<String>添加到ArrayBlockingQueue<Promise> 这将作为最近发送的消息的列表,并允许我们更改我们的Future<String>对象返回结果。
  2. 当消息返回到处理程序时,将其解析为Queue的头部 这使我们能够获得正确的未来变化。
  3. 更新Promise<String>的状态 我们调用promise.setSuccess()来最终设置对象的状态,这将传播回未来的对象。

示例代码

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    private ChannelHandlerContext ctx;
    private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>(16);

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        super.channelActive(ctx);
        this.ctx = ctx;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        super.channelInactive(ctx);
        synchronized(this){
            Promise<String> prom;
            Exception err = null;
            while((prom = messageList.poll()) != null) 
                prom.setFailure(err != null ? err : 
                    err = new IOException("Connection lost"));
            messageList = null;
        }
    }

    public Future<String> sendMessage(String message) {
        if(ctx == null) 
            throw new IllegalStateException();
        return sendMessage(message, ctx.executor().newPromise());
    }

    public Future<String> sendMessage(String message, Promise<String> prom) {
        synchronized(this){
            if(messageList == null) {
                // Connection closed
                prom.setFailure(new IllegalStateException());
            } else if(messageList.offer(prom)) { 
                // Connection open and message accepted
                ctx.writeAndFlush(message).addListener();
            } else { 
                // Connection open and message rejected
                prom.setFailure(new BufferOverflowException());
            }
            return prom;
        }
    }
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) {
        synchronized(this){
            if(messageList != null) {
                 messageList.poll().setSuccess(msg);
            }
        }
    }
}

文档细分

  • private ChannelHandlerContext ctx; 用于存储我们对ChannelHandlerContext的引用,我们使用它来创建promises
  • private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>(); 我们将过去的消息保留在此列表中,以便我们可以更改未来的结果
  • public void channelActive(ChannelHandlerContext ctx) 当连接变为活动状态时由netty调用。在这里初始化变量。
  • public void channelInactive(ChannelHandlerContext ctx) 当连接变为非活动状态时由netty调用,由于错误或正常连接关闭。
  • protected void messageReceived(ChannelHandlerContext ctx, String msg) 当新消息到达时由netty调用,这里挑出队列的头部,然后我们调用setsuccess。

警告提醒

当使用期货时,你需要注意一件事,如果未来尚未完成,不要从1个网络线程中调用get(),不遵循这个简单的规则将导致死锁或BlockingOperationException


3
投票

您可以在netty项目中找到该示例。我们可以将结果保存到最后一个处理程序的自定义字段中。在下面的代码中,它是我们想要的handler.getFactorial()。

请参考http://www.lookatsrc.com/source/io/netty/example/factorial/FactorialClient.java?a=io.netty:netty-all

factorial client.Java

public final class FactorialClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8322"));
    static final int COUNT = Integer.parseInt(System.getProperty("count", "1000"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new FactorialClientInitializer(sslCtx));

            // Make a new connection.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Get the handler instance to retrieve the answer.
            FactorialClientHandler handler =
                (FactorialClientHandler) f.channel().pipeline().last();

            // Print out the answer.
            System.err.format("Factorial of %,d is: %,d", COUNT, handler.getFactorial());
        } finally {
            group.shutdownGracefully();
        }
    }
}

public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> {

    private ChannelHandlerContext ctx;
    private int receivedMessages;
    private int next = 1;
    final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();

    public BigInteger getFactorial() {
        boolean interrupted = false;
        try {
            for (;;) {
                try {
                    return answer.take();
                } catch (InterruptedException ignore) {
                    interrupted = true;
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        this.ctx = ctx;
        sendNumbers();
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, final BigInteger msg) {
        receivedMessages ++;
        if (receivedMessages == FactorialClient.COUNT) {
            // Offer the answer after closing the connection.
            ctx.channel().close().addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    boolean offered = answer.offer(msg);
                    assert offered;
                }
            });
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    private void sendNumbers() {
        // Do not send more than 4096 numbers.
        ChannelFuture future = null;
        for (int i = 0; i < 4096 && next <= FactorialClient.COUNT; i++) {
            future = ctx.write(Integer.valueOf(next));
            next++;
        }
        if (next <= FactorialClient.COUNT) {
            assert future != null;
            future.addListener(numberSender);
        }
        ctx.flush();
    }

    private final ChannelFutureListener numberSender = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                sendNumbers();
            } else {
                future.cause().printStackTrace();
                future.channel().close();
            }
        }
    };
}

0
投票

调用channel.writeAndFlush(msg);已经返回ChannelFuture。要处理此方法调用的结果,您可以像以下一样向未来添加一个侦听器:

future.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) {
        // Perform post-closure operation
        // ...
    }
}); 

(这取自Netty文档,请参阅:Netty doc

© www.soinside.com 2019 - 2024. All rights reserved.