netty 通道正在连接到远程服务器,但不是 localhost/127.0.0.1

问题描述 投票:0回答:1

我正在用 netty 编写应该连接到远程服务器的代码。

下面是我的完整代码。


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.logging.LogLevel;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.example.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyConnectionDemo {

    protected final Class<? extends SocketChannel> channelClass;
    protected final EventLoopGroup ioWorkers;

    protected LogLevel level;

    Logger log = LoggerFactory.getLogger(NettyConnectionDemo.class);
    protected long connectTimeOutMills = TimeUnit.SECONDS.toMillis(3);

    public NettyConnectionDemo(Class<? extends SocketChannel> channelClass, EventLoopGroup ioWorkers) {
        this.channelClass = channelClass;
        this.ioWorkers = ioWorkers;
        init();
    }

    public NettyConnectionDemo() {
        this(0);
    }

    public NettyConnectionDemo(int ioThreads) {
        if (ioThreads < 0) {
            ioThreads = 0;
        }
        EventLoopGroup workerGroup;
        Class<? extends SocketChannel> channelClass;
        if (Epoll.isAvailable()) {
            channelClass = EpollSocketChannel.class;
            workerGroup = new EpollEventLoopGroup(ioThreads, new NamedThreadFactory("ClientConfig-ioWorkers", true));
        } else {
            channelClass = NioSocketChannel.class;
            workerGroup = new NioEventLoopGroup(ioThreads, new NamedThreadFactory("ClientConfig-ioWorkers", true));
        }
        this.channelClass = channelClass;
        this.ioWorkers = workerGroup;
        init();
    }

    public Class<? extends SocketChannel> getChannelClass() {
        return channelClass;
    }

    public EventLoopGroup getIoWorkers() {
        return ioWorkers;
    }

    public LogLevel getLevel() {
        return level;
    }

    public void setLevel(LogLevel level) {
        this.level = level;
    }

    public long getConnectTimeOutMills() {
        return connectTimeOutMills;
    }

    public void setConnectTimeOutMills(long connectTimeOutMills) {
        this.connectTimeOutMills = connectTimeOutMills;
    }

    public void destory() {
        if (ioWorkers != null) {
            ioWorkers.shutdownGracefully();
        }
    }

    protected final Bootstrap booter = new Bootstrap();

    @ChannelHandler.Sharable
    class ShareableChannelInboundHandler extends ChannelInboundHandlerAdapter {}

    Bootstrap getBooter() {
        return booter;
    }

    private void init() {
        booter.group(ioWorkers);
        booter.channel(channelClass);
    }

    protected void initBooterOptions() {
        booter.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000);
    }

    protected ChannelHandler initHandlerAdapter(ChannelHandler init, Consumer<ChannelHandlerContext> closeListener) {
        ChannelHandler handler = new ShareableChannelInboundHandler() {
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                Channel ch = ctx.channel();
                LogLevel level = getLevel();
                ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                        log.info("channelRegistered:{}", ctx.channel());
                        super.channelRegistered(ctx);
                    }

                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        log.info("channelActive:{}", ctx.channel());
                        super.channelActive(ctx);
                    }

                    @Override
                    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                        log.info("channelInactive:{}", ctx.channel());
                        if (closeListener != null) {
                            try {
                                closeListener.accept(ctx);
                            } catch (Throwable e) {
                                log.error(e.getMessage(), e);
                            }
                        }
                        super.channelInactive(ctx);
                    }

                    @Override
                    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
                        log.info("channelUnregistered:{}", ctx.channel());
                        super.channelUnregistered(ctx);
                    }
                });
                
                ch.pipeline().addLast(new HttpRequestEncoder());
                ch.pipeline().addLast(init);
                ctx.pipeline().remove(this);
                ctx.fireChannelRegistered();
            }
        };
        return handler;
    }

    protected ChannelFuture doBooterConnect(InetSocketAddress address, final ChannelHandler init, Consumer<ChannelHandlerContext> closeListener) {
        ChannelFuture cf;
        synchronized (booter) {
            ChannelHandler handler = initHandlerAdapter(init, closeListener);
            booter.handler(handler);
            cf = booter.connect(address);
        }
        return cf;
    }

    public final ChannelFuture connect(InetSocketAddress address) {
        return doBooterConnect(address, null, null);
    }

    public final ChannelFuture connect(InetSocketAddress address, ChannelHandler handler) {
        return doBooterConnect(address, handler, null);
    }

    public static void main(String[] args) throws InterruptedException {
        NettyConnectionDemo cb = new NettyConnectionDemo(NioSocketChannel.class, new NioEventLoopGroup());
        ChannelFuture cf = cb.connect(new InetSocketAddress("google.com", 80)).syncUninterruptibly();
        System.out.println(cf.channel());
    }
}

当我连接到 google.com、yahoo.com 等远程计算机时,此代码工作正常。

当我更改如下代码以连接到本地运行的 http 服务器时,代码抛出异常。我可以在curl中进行REST调用,它工作正常。

ChannelFuture cf = cb.connect(new InetSocketAddress("localhost", 3000)).syncUninterruptibly();
ChannelFuture cf = cb.connect(new InetSocketAddress("127.0.0.1", 3000)).syncUninterruptibly();

两者都不起作用。

我遇到的例外是 -

SLF4J:默认为无操作(NOP)记录器实现 SLF4J:请参阅 https://www.slf4j.org/codes.html#noProviders 了解更多详细信息。 线程“main”中的异常 io.netty.channel.AbstractChannel$AnnotatedConnectException:连接被拒绝:localhost/127.0.0.1:3000 原因:java.net.ConnectException:连接被拒绝 在 sun.nio.ch.SocketChannelImpl.checkConnect(本机方法) 在sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716) 在io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337) 在 io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) 在 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776) 在io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) 在io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) 在io.netty.util.concurrent.SingleThreadEventExecutor $4.run(SingleThreadEventExecutor.java:997) 在 io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在 io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.lang.Thread.run(Thread.java:750)

我怎样才能让它也与本地服务器一起工作?

netty netty4
1个回答
0
投票

当您使用netty连接时,它使用NIO系统库连接到端点,它不使用任何其他智能逻辑,如curl或您的浏览器。

这里的问题是您使用节点创建的服务器正在

localhost
上运行。节点根据系统优先级解析域名,所以
localhost
的第一个结果就变成了IPv6
::1

当您运行 Java 进程时,您不会使用

-Djava.net.preferIPv6Addresses=true
标志来启动它,因此 Java 以旧程序的兼容模式运行,并且始终首先将任何域解析为 IPv4。对于本地主机,这是
127.0.0.1
,因为您的服务器没有在该 IP 上运行,并且它只尝试连接到单个 DNS 记录,所以它不起作用。

出现错误的示例:

$ netstat -nlpa
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address   Foreign Address  State       PID/Program name
tcp6       0      0 ::1:3000        :::*             LISTEN      11684/node

$ jshell
jshell> java.net.InetAddress.getByName("localhost");
$1 ==> localhost/127.0.0.1

$ jshell -R-Djava.net.preferIPv6Addresses=true -J-Djava.net.preferIPv6Addresses=true
jshell> java.net.InetAddress.getByName("localhost");
$1 ==> localhost/0:0:0:0:0:0:0:1

要修复,请使用任一解决方案:

  • 启动程序时始终传递标志
    -Djava.net.preferIPv6Addresses=true
    以禁用向后兼容性
  • 将域解析到所有 IP 地址并按顺序尝试,直到达到超时
  • 将域解析为所有 IP 地址并同时连接到它们,每次新尝试都会有一些延迟,选择第一个有效的(Happy Eyeballs 算法,由 Curl 和 Chromium 使用。Curl 默认为 200 毫秒)
© www.soinside.com 2019 - 2024. All rights reserved.