网盘服务器无法接收到客户端发送的所有消息。

问题描述 投票:0回答:1

我在项目中有一个netty服务器和客户端,想在它们之间交换消息。

netty服务器的代码。

//主事件池
private EventLoopGroup bossGroup = new NioEventLoopGroup();

//副事件池
private EventLoopGroup workerGroup = new NioEventLoopGroup();

//服务端通道
private Channel serverChannel;

/**
 * 绑定本机监听
 *
 * @throws Exception
 */
public void Start(int port) throws Exception {

    //启动器
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    //为Acceptor设置事件池,为客户端接收设置事件池
    serverBootstrap.group(bossGroup, workerGroup)
            //工厂模式,创建NioServerSocketChannel类对象
            .channel(NioServerSocketChannel.class)
            //等待队列大小
            .option(ChannelOption.SO_BACKLOG, 100)
            //地址复用
            .option(ChannelOption.SO_REUSEADDR, true)
            .option(ChannelOption.TCP_NODELAY, true)
            //日志记录组件的level
            .handler(new LoggingHandler(LogLevel.INFO))
            //各种业务处理handler
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    //编码器
                    channel.pipeline().addLast("nettyMessageDecoder", new MicroMessageDecoder(1024, 4, 4));
                    //解码器
                    channel.pipeline().addLast("nettyMessageEncoder", new MicroMessageEncoder());
                    //业务处理handler
                    channel.pipeline().addLast("nettyHandler", new MicroServerHandler());
                }
            });

    //绑定本机
    String host = "127.0.0.1";

    //绑定端口,同步等待成功
    ChannelFuture future = serverBootstrap.bind(host, port).sync();

    //注册连接事件监听器
    future.addListener(cfl -> {
        if (cfl.isSuccess()) {
            logger.info("服务端[" + host + ":" + port + "]已上线...");
            serverChannel = future.channel();
        }
    });

    //注册关闭事件监听器
    future.channel().closeFuture().addListener(cfl -> {
        //关闭服务端
        close();
        logger.info("服务端[" + host + ":" + port + "]已下线...");
    });
}

/**
 * 关闭server
 */
public void close() {
    //关闭套接字
    if(serverChannel!=null){
        serverChannel.close();
    }
    //关闭主线程组
    if (bossGroup != null) {
        bossGroup.shutdownGracefully();
    }
    //关闭副线程组
    if (workerGroup != null) {
        workerGroup.shutdownGracefully();
    }
}

netty客户端代码:

  @Service
  public class MicroClient {

//日志记录
private static final Logger logger = LoggerFactory.getLogger(MicroClient.class);

//事件池
private EventLoopGroup group = new NioEventLoopGroup();

//启动器
private Bootstrap bootstrap = new Bootstrap();

//客户端通道
private Channel clientChannel;

//客户端处理handler
private MicroClientHandler microClientHandler;

/**
 * 连接服务器
 * @param host
 * @param port
 * @throws InterruptedException
 */
public void Connect(String host, int port) throws InterruptedException {
    microClientHandler = new MicroClientHandler();
    bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    //解码器
                    channel.pipeline().addLast("nettyMessageDecoder", new MicroMessageDecoder(1024, 4, 4));
                    //编码器
                    channel.pipeline().addLast("nettyMessageEncoder", new MicroMessageEncoder());
                    //业务处理
                    channel.pipeline().addLast("clientHandler", microClientHandler);
                }
            });

    //发起同步连接操作
    ChannelFuture channelFuture = bootstrap.connect(host, port).sync();

    //检测连接完毕
    if(channelFuture.isDone()){
        logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已连接...");
        clientChannel = channelFuture.channel();
    }

    //注册关闭事件
    channelFuture.channel().closeFuture().addListener(cfl -> {
        close();
        logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已断开...");
    });
}

/**
 * 客户端关闭
 */
private void close() {
    //关闭客户端套接字
    if(clientChannel!=null){
        clientChannel.close();
    }
    //关闭客户端线程组
    if (group != null) {
        group.shutdownGracefully();
    }
}

/**
 * 客户端发送信息
 * @param microMessage
 */
public void send( MicroMessage microMessage) {
    microClientHandler.send(microMessage);
}

}

服务器处理程序代码:Netty客户端代码:Netty服务器代码:Netty客户端代码

public class MicroServerHandler  extends ChannelInboundHandlerAdapter {

private static final Logger logger = LoggerFactory.getLogger(MicroServerHandler.class);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    MicroMessage message = (MicroMessage) msg;
    logger.error("receive client message : " + message.getMessage());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.fireExceptionCaught(cause);
    ctx.close();
}

}

客户端处理程序代码: 客户端处理程序代码。

public class MicroClientHandler extends ChannelInboundHandlerAdapter {

private static final Logger logger = LoggerFactory.getLogger(MicroClientHandler.class);

private ChannelHandlerContext ctx;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    this.ctx = ctx;
    for (int i = 0; i < 10; i++) {
        String message = "message timestamp " + System.currentTimeMillis() + " " + i;
        MicroMessage microMessage = new MicroMessage();
        microMessage.setMessage(message);
        ctx.writeAndFlush(microMessage);
        System.out.println("send client message : " + message);
    }
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    MicroMessage message = (MicroMessage) msg;
}

public void send(MicroMessage microMessage) {
    if (ctx != null) {
        ctx.writeAndFlush(microMessage);
    }else{
        logger.error("ctx is not prepared well now...");
    }
}
}

消息解码器代码

public class MicroMessageDecoder extends LengthFieldBasedFrameDecoder{

public MicroMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
    super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}

@Override
public  Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    try {
        byte[] dstBytes = new byte[in.readableBytes()];
        in.readBytes(dstBytes, 0, in.readableBytes());
        MicroMessage microMessage = MicroSerializeUtil.deserialize(dstBytes, MicroMessage.class);
        return microMessage;
    } catch (Exception e) {
        System.out.println("exception when decoding: " + e);
        return null;
    }
}
}

消息解码器代码: 消息编码器代码:

public class MicroMessageEncoder extends MessageToByteEncoder<MicroMessage> {

@Override
protected void encode(ChannelHandlerContext ctx, MicroMessage msg, ByteBuf out) throws Exception {
    out.writeBytes(MicroSerializeUtil.serialize(msg));
}
}

SerializeUtil代码,我用了protostuff作为我的编解码器的东西。

public class MicroSerializeUtil {

private static class SerializeData{
    private Object target;
}

@SuppressWarnings("unchecked")
public static byte[] serialize(Object object) {
    SerializeData serializeData = new SerializeData();
    serializeData.target = object;
    Class<SerializeData> serializeDataClass = (Class<SerializeData>) serializeData.getClass();
    LinkedBuffer linkedBuffer = LinkedBuffer.allocate(1024 * 4);
    try {
        Schema<SerializeData> schema = RuntimeSchema.getSchema(serializeDataClass);
        return ProtobufIOUtil.toByteArray(serializeData, schema, linkedBuffer);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    } finally {
        linkedBuffer.clear();
    }
}

@SuppressWarnings("unchecked")
public static <T> T deserialize(byte[] data, Class<T> clazz) {
    try {
        Schema<SerializeData> schema = RuntimeSchema.getSchema(SerializeData.class);
        SerializeData serializeData = schema.newMessage();
        ProtobufIOUtil.mergeFrom(data, serializeData, schema);
        return (T) serializeData.target;
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}
}

服务器测试如下

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath*:spring-config.xml"})
public class ServerTest {

@Resource
private MicroServer microServer;

@Test
public void testServer() throws Exception {
    microServer.Start(9023);
    System.in.read();
}
}

客户端测试如下图

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath*:spring-config.xml"})
public class ClientTest {

@Resource
private MicroClient microClient;

@Before
public void init() throws InterruptedException {
    microClient.Connect("127.0.0.1",9023);
}

@Test
public void testClient() throws Exception {
    System.in.read();
}
}

服务器输出如下图

2020-06-10 17:21:54,970 INFO  [nioEventLoopGroup-3-1] micro.MicroServer (MicroServer.java:82) - 服务端[127.0.0.1:9023]已上线...
2020-06-10 17:22:00,232 ERROR [nioEventLoopGroup-4-1] micro.MicroServerHandler (MicroServerHandler.java:21) - receive client message : message timestamp 1591780920120 9

客户端输出如下:

2020-06-10 17:21:59,988 INFO  [main] micro.MicroClient (MicroClient.java:67) - 客户端[/127.0.0.1:49299]已连接...
send client message : message timestamp 1591780919987 0
send client message : message timestamp 1591780920117 1
send client message : message timestamp 1591780920117 2
send client message : message timestamp 1591780920118 3
send client message : message timestamp 1591780920118 4
send client message : message timestamp 1591780920118 5
send client message : message timestamp 1591780920119 6
send client message : message timestamp 1591780920119 7
send client message : message timestamp 1591780920119 8
send client message : message timestamp 1591780920120 9

所以从日志的输出中,我们可以看到,客户端向服务器端发送了10条消息,但服务器端只收到一条消息,我的代码有什么问题吗?我想可能是我用错了什么东西?

EDIT:在MicroMessageDecoder类中,我调试了dstBytes变量,得到以下信息。

P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477591 0
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 1
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 2
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 3
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 4
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 5
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 6
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 7
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 8
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 9

所有的消息都在这里,所以可能是解码器工作出了问题?

netty
1个回答
0
投票

你应该检查 ChannelFuture 所返回的 writeAndFlush 如果写入失败,可以理解。

为此添加一个 ChannelFutureListener 到它。

channel.writeAndFlush(msg).addListener(new ChannelFutureListener() {


    @Override
    public void operationComplete(ChannelFuture future) {
        if (future.isSuccess()) {
            ...
        } else {
            Throwable cause = future.cause();
            ...
        }
    }
});

0
投票

终于解决了这个问题。因为客户端发送的10条消息会同时发布到服务器端,所以我们需要添加LengthFieldBasedFrameDecoder来分割消息。

public class MicroLengthDecoder extends LengthFieldBasedFrameDecoder {
public MicroLengthDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
    super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
    }
}

然后添加如下。

.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    //拆分消息,头部字节长度为4字节
                    channel.pipeline().addLast("nettyLengthDecoder", new MicroLengthDecoder(1024 * 1024, 0, 4));
                    channel.pipeline().addLast("nettyMessageEncoder", new MicroMessageEncoder());
                    channel.pipeline().addLast("nettyMessageDecoder", new MicroMessageDecoder());
                   ....
                }
            });

需要修改下面两个类。

public class MicroMessageDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> obj) throws Exception {
    try {
        byte[] originBytes = new byte[in.readableBytes()];
        in.readBytes(originBytes, 0, in.readableBytes());

        //去掉头部的4字节
        byte[] dstBytes = new byte[originBytes.length-4];
        System.arraycopy(originBytes,4,dstBytes,0,dstBytes.length);

        MicroMessage microMessage = MicroSerializeUtil.deserialize(dstBytes, MicroMessage.class);
        obj.add(microMessage);
    } catch (Exception e) {
        System.out.println("exception when decoding: " + e);
    }
}
}


public class MicroMessageEncoder extends MessageToByteEncoder<MicroMessage> {

@Override
protected void encode(ChannelHandlerContext ctx, MicroMessage msg, ByteBuf out) throws Exception {
    byte[] data = MicroSerializeUtil.serialize(msg);
    ByteBuf buf = Unpooled.copiedBuffer(intToBytes(data.length), data);
    out.writeBytes(buf);
}

/**
 * 在消息体头部附带4字节,主要是为了拆分消息用
 * @param num
 * @return
 */
public byte[] intToBytes(int num) {
    byte[] bytes = new byte[4];
    for (int i = 0; i < 4; i++) {
        bytes[i] = (byte) (num >> (24 - i * 8));
    }
    return bytes;
}
}
© www.soinside.com 2019 - 2024. All rights reserved.