我从here克隆了UDP教程,目的是为以下内容创建一个简单的junit测试:
这是我的主课
import org.junit.jupiter.api.Test;
import io.netty.channel.ChannelOption;
import reactor.netty.udp.UdpServer;
public class CustomUdpServer {
private static final int PORT = Integer.parseInt(System.getProperty("port", "7686"));
private static final boolean WIRETAP = System.getProperty("wiretap") != null;
@Test
public void build() throws Exception {
UdpServer server =
UdpServer.create()
.port(PORT)
.wiretap(WIRETAP)
.option(ChannelOption.SO_BROADCAST, true)
.doOnBound(conn -> conn.addHandlerLast(new SimpleRequestDecoder()))
.doOnChannelInit((observer, channel, remoteAddress) ->
channel.pipeline()
.addFirst(new SimpleResponseHandler()));
server.bindNow()
.onDispose()
.block();
}
}
这是我的解码器
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@NoArgsConstructor
public class SimpleRequestDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer(in);
byte[] bytes = new byte[in.readableBytes()];
buf.readBytes(bytes);
String content = new String(bytes) + " decoded";
log.info("--> " + content);
out.add(content);
}
}
这是我的响应处理程序
import java.nio.charset.StandardCharsets;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SimpleResponseHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info(this.getClass().getSimpleName()+" started -> "+ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel() != null) {
log.info(this.getClass().getSimpleName()+" inactive -> "+ctx.channel());
}
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
String content = String.valueOf(msg);
if(msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
content = packet.content().toString(StandardCharsets.UTF_8);
}
content += " responsed";
log.info("-> " + content);
ctx.writeAndFlush(content);
super.channelRead(ctx, content);
}
}
我使用了与 Github 中相同的 UDP 客户端,但是当我尝试发送消息时,似乎没有调用附加的解码器和响应处理程序,
任何人都可以给我建议或指示?