我正在用 netty 编写应该连接到远程服务器的代码。
下面是我的完整代码。
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.logging.LogLevel;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.example.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NettyConnectionDemo {
protected final Class<? extends SocketChannel> channelClass;
protected final EventLoopGroup ioWorkers;
protected LogLevel level;
Logger log = LoggerFactory.getLogger(NettyConnectionDemo.class);
protected long connectTimeOutMills = TimeUnit.SECONDS.toMillis(3);
public NettyConnectionDemo(Class<? extends SocketChannel> channelClass, EventLoopGroup ioWorkers) {
this.channelClass = channelClass;
this.ioWorkers = ioWorkers;
init();
}
public NettyConnectionDemo() {
this(0);
}
public NettyConnectionDemo(int ioThreads) {
if (ioThreads < 0) {
ioThreads = 0;
}
EventLoopGroup workerGroup;
Class<? extends SocketChannel> channelClass;
if (Epoll.isAvailable()) {
channelClass = EpollSocketChannel.class;
workerGroup = new EpollEventLoopGroup(ioThreads, new NamedThreadFactory("ClientConfig-ioWorkers", true));
} else {
channelClass = NioSocketChannel.class;
workerGroup = new NioEventLoopGroup(ioThreads, new NamedThreadFactory("ClientConfig-ioWorkers", true));
}
this.channelClass = channelClass;
this.ioWorkers = workerGroup;
init();
}
public Class<? extends SocketChannel> getChannelClass() {
return channelClass;
}
public EventLoopGroup getIoWorkers() {
return ioWorkers;
}
public LogLevel getLevel() {
return level;
}
public void setLevel(LogLevel level) {
this.level = level;
}
public long getConnectTimeOutMills() {
return connectTimeOutMills;
}
public void setConnectTimeOutMills(long connectTimeOutMills) {
this.connectTimeOutMills = connectTimeOutMills;
}
public void destory() {
if (ioWorkers != null) {
ioWorkers.shutdownGracefully();
}
}
protected final Bootstrap booter = new Bootstrap();
@ChannelHandler.Sharable
class ShareableChannelInboundHandler extends ChannelInboundHandlerAdapter {}
Bootstrap getBooter() {
return booter;
}
private void init() {
booter.group(ioWorkers);
booter.channel(channelClass);
}
protected void initBooterOptions() {
booter.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000);
}
protected ChannelHandler initHandlerAdapter(ChannelHandler init, Consumer<ChannelHandlerContext> closeListener) {
ChannelHandler handler = new ShareableChannelInboundHandler() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
LogLevel level = getLevel();
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("channelRegistered:{}", ctx.channel());
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("channelActive:{}", ctx.channel());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("channelInactive:{}", ctx.channel());
if (closeListener != null) {
try {
closeListener.accept(ctx);
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
}
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.info("channelUnregistered:{}", ctx.channel());
super.channelUnregistered(ctx);
}
});
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(init);
ctx.pipeline().remove(this);
ctx.fireChannelRegistered();
}
};
return handler;
}
protected ChannelFuture doBooterConnect(InetSocketAddress address, final ChannelHandler init, Consumer<ChannelHandlerContext> closeListener) {
ChannelFuture cf;
synchronized (booter) {
ChannelHandler handler = initHandlerAdapter(init, closeListener);
booter.handler(handler);
cf = booter.connect(address);
}
return cf;
}
public final ChannelFuture connect(InetSocketAddress address) {
return doBooterConnect(address, null, null);
}
public final ChannelFuture connect(InetSocketAddress address, ChannelHandler handler) {
return doBooterConnect(address, handler, null);
}
public static void main(String[] args) throws InterruptedException {
NettyConnectionDemo cb = new NettyConnectionDemo(NioSocketChannel.class, new NioEventLoopGroup());
ChannelFuture cf = cb.connect(new InetSocketAddress("google.com", 80)).syncUninterruptibly();
System.out.println(cf.channel());
}
}
当我连接到 google.com、yahoo.com 等远程计算机时,此代码工作正常。
当我更改如下代码以连接到本地运行的 http 服务器时,代码抛出异常。我可以在curl中进行REST调用,它工作正常。
ChannelFuture cf = cb.connect(new InetSocketAddress("localhost", 3000)).syncUninterruptibly();
ChannelFuture cf = cb.connect(new InetSocketAddress("127.0.0.1", 3000)).syncUninterruptibly();
两者都不起作用。
我遇到的例外是 -
SLF4J:默认为无操作(NOP)记录器实现 SLF4J:请参阅 https://www.slf4j.org/codes.html#noProviders 了解更多详细信息。 线程“main”中的异常 io.netty.channel.AbstractChannel$AnnotatedConnectException:连接被拒绝:localhost/127.0.0.1:3000 原因:java.net.ConnectException:连接被拒绝 在 sun.nio.ch.SocketChannelImpl.checkConnect(本机方法) 在sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716) 在io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337) 在 io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) 在 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776) 在io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) 在io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) 在io.netty.util.concurrent.SingleThreadEventExecutor $4.run(SingleThreadEventExecutor.java:997) 在 io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在 io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.lang.Thread.run(Thread.java:750)
我怎样才能让它也与本地服务器一起工作?
当您使用netty连接时,它使用NIO系统库连接到端点,它不使用任何其他智能逻辑,如curl或您的浏览器。
这里的问题是您使用节点创建的服务器正在
localhost
上运行。节点根据系统优先级解析域名,所以localhost
的第一个结果就变成了IPv6::1
。
当您运行 Java 进程时,您不会使用
-Djava.net.preferIPv6Addresses=true
标志来启动它,因此 Java 以旧程序的兼容模式运行,并且始终首先将任何域解析为 IPv4。对于本地主机,这是 127.0.0.1
,因为您的服务器没有在该 IP 上运行,并且它只尝试连接到单个 DNS 记录,所以它不起作用。
出现错误的示例:
$ netstat -nlpa
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp6 0 0 ::1:3000 :::* LISTEN 11684/node
$ jshell
jshell> java.net.InetAddress.getByName("localhost");
$1 ==> localhost/127.0.0.1
$ jshell -R-Djava.net.preferIPv6Addresses=true -J-Djava.net.preferIPv6Addresses=true
jshell> java.net.InetAddress.getByName("localhost");
$1 ==> localhost/0:0:0:0:0:0:0:1
要修复,请使用任一解决方案:
-Djava.net.preferIPv6Addresses=true
以禁用向后兼容性