我想写一个基于netty的客户端。它应该有方法public String send(String msg);哪个应该从服务器或某个未来返回响应 - doesen't不重要。它也应该是多线程的。像这样:
public class Client {
public static void main(String[] args) throws InterruptedException {
Client client = new Client();
}
private Channel channel;
public Client() throws InterruptedException {
EventLoopGroup loopGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder()).
addLast(new StringEncoder()).
addLast(new ClientHandler());
}
});
channel = b.connect("localhost", 9091).sync().channel();
}
public String sendMessage(String msg) {
channel.writeAndFlush(msg);
return ??????????;
}
}
我不知道如何在调用writeAndFlush()之后从服务器检索响应;我该怎么办?
我也使用Netty 4.0.18.Final
为方法返回Future<String>
很简单,我们将实现以下方法签名:
public Futute<String> sendMessage(String msg) {
当您熟悉异步编程结构时,相对容易做到。为了解决设计问题,我们将执行以下步骤:
Promise<String>
添加到ArrayBlockingQueue<Promise>
这将作为最近发送的消息的列表,并允许我们更改我们的Future<String>
对象返回结果。Queue
的头部
这使我们能够获得正确的未来变化。Promise<String>
的状态
我们调用promise.setSuccess()
来最终设置对象的状态,这将传播回未来的对象。public class ClientHandler extends SimpleChannelInboundHandler<String> {
private ChannelHandlerContext ctx;
private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>(16);
@Override
public void channelActive(ChannelHandlerContext ctx) {
super.channelActive(ctx);
this.ctx = ctx;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
super.channelInactive(ctx);
synchronized(this){
Promise<String> prom;
Exception err = null;
while((prom = messageList.poll()) != null)
prom.setFailure(err != null ? err :
err = new IOException("Connection lost"));
messageList = null;
}
}
public Future<String> sendMessage(String message) {
if(ctx == null)
throw new IllegalStateException();
return sendMessage(message, ctx.executor().newPromise());
}
public Future<String> sendMessage(String message, Promise<String> prom) {
synchronized(this){
if(messageList == null) {
// Connection closed
prom.setFailure(new IllegalStateException());
} else if(messageList.offer(prom)) {
// Connection open and message accepted
ctx.writeAndFlush(message).addListener();
} else {
// Connection open and message rejected
prom.setFailure(new BufferOverflowException());
}
return prom;
}
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) {
synchronized(this){
if(messageList != null) {
messageList.poll().setSuccess(msg);
}
}
}
}
private ChannelHandlerContext ctx;
用于存储我们对ChannelHandlerContext的引用,我们使用它来创建promisesprivate BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>();
我们将过去的消息保留在此列表中,以便我们可以更改未来的结果public void channelActive(ChannelHandlerContext ctx)
当连接变为活动状态时由netty调用。在这里初始化变量。public void channelInactive(ChannelHandlerContext ctx)
当连接变为非活动状态时由netty调用,由于错误或正常连接关闭。protected void messageReceived(ChannelHandlerContext ctx, String msg)
当新消息到达时由netty调用,这里挑出队列的头部,然后我们调用setsuccess。当使用期货时,你需要注意一件事,如果未来尚未完成,不要从1个网络线程中调用get(),不遵循这个简单的规则将导致死锁或BlockingOperationException
。
您可以在netty项目中找到该示例。我们可以将结果保存到最后一个处理程序的自定义字段中。在下面的代码中,它是我们想要的handler.getFactorial()。
请参考http://www.lookatsrc.com/source/io/netty/example/factorial/FactorialClient.java?a=io.netty:netty-all
factorial client.Java
public final class FactorialClient {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8322"));
static final int COUNT = Integer.parseInt(System.getProperty("count", "1000"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new FactorialClientInitializer(sslCtx));
// Make a new connection.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Get the handler instance to retrieve the answer.
FactorialClientHandler handler =
(FactorialClientHandler) f.channel().pipeline().last();
// Print out the answer.
System.err.format("Factorial of %,d is: %,d", COUNT, handler.getFactorial());
} finally {
group.shutdownGracefully();
}
}
}
public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> {
private ChannelHandlerContext ctx;
private int receivedMessages;
private int next = 1;
final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();
public BigInteger getFactorial() {
boolean interrupted = false;
try {
for (;;) {
try {
return answer.take();
} catch (InterruptedException ignore) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
this.ctx = ctx;
sendNumbers();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, final BigInteger msg) {
receivedMessages ++;
if (receivedMessages == FactorialClient.COUNT) {
// Offer the answer after closing the connection.
ctx.channel().close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
boolean offered = answer.offer(msg);
assert offered;
}
});
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
private void sendNumbers() {
// Do not send more than 4096 numbers.
ChannelFuture future = null;
for (int i = 0; i < 4096 && next <= FactorialClient.COUNT; i++) {
future = ctx.write(Integer.valueOf(next));
next++;
}
if (next <= FactorialClient.COUNT) {
assert future != null;
future.addListener(numberSender);
}
ctx.flush();
}
private final ChannelFutureListener numberSender = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
sendNumbers();
} else {
future.cause().printStackTrace();
future.channel().close();
}
}
};
}
调用channel.writeAndFlush(msg);
已经返回ChannelFuture。要处理此方法调用的结果,您可以像以下一样向未来添加一个侦听器:
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// Perform post-closure operation
// ...
}
});
(这取自Netty文档,请参阅:Netty doc)