Reactor Netty 自定义通道处理程序未被调用

问题描述 投票:0回答:0

我从here克隆了UDP教程,目的是为以下内容创建一个简单的junit测试:

  1. 接受数据报包并将其解码为某种类型(目前是简单的字符串)
  2. 然后在解码后,它会附加一个字符串并尝试将其作为响应发送回去;

这是我的主课

import org.junit.jupiter.api.Test;

import io.netty.channel.ChannelOption;
import reactor.netty.udp.UdpServer;

public class CustomUdpServer {

private static final int PORT = Integer.parseInt(System.getProperty("port", "7686"));
private static final boolean WIRETAP = System.getProperty("wiretap") != null;

@Test
public void build() throws Exception {
    UdpServer server =
            UdpServer.create()
                     .port(PORT)
                     .wiretap(WIRETAP)
                     .option(ChannelOption.SO_BROADCAST, true)
                     .doOnBound(conn -> conn.addHandlerLast(new SimpleRequestDecoder())) 
                     .doOnChannelInit((observer, channel, remoteAddress) ->
                        channel.pipeline()
                            .addFirst(new SimpleResponseHandler()));    
    server.bindNow()
          .onDispose()
          .block();
}
}

这是我的解码器

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@NoArgsConstructor
public class SimpleRequestDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer(in);
        byte[] bytes = new byte[in.readableBytes()];
        buf.readBytes(bytes);
        String content = new String(bytes) + " decoded";
        log.info("--> " + content);
        out.add(content);
    }
}

这是我的响应处理程序

import java.nio.charset.StandardCharsets;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SimpleResponseHandler extends SimpleChannelInboundHandler<Object> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info(this.getClass().getSimpleName()+" started -> "+ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel() != null) {
            log.info(this.getClass().getSimpleName()+" inactive -> "+ctx.channel());
        }
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        String content = String.valueOf(msg);
        
        if(msg instanceof DatagramPacket) {
              DatagramPacket packet = (DatagramPacket) msg;
              content = packet.content().toString(StandardCharsets.UTF_8);
        }
        
        content += " responsed";
        log.info("-> " + content);
        
        ctx.writeAndFlush(content);
        super.channelRead(ctx, content);
    }
}

我使用了与 Github 中相同的 UDP 客户端,但是当我尝试发送消息时,似乎没有调用附加的解码器和响应处理程序,

任何人都可以给我建议或指示?

java spring spring-boot netty reactor-netty
© www.soinside.com 2019 - 2024. All rights reserved.