使用 Netty HttpProxyHandler

问题描述 投票:0回答:0

我正在实施一个位于公司代理后面的 Netty 客户端(实际上,我正在尝试自定义 Camel 的 netty-http 组件以使用 http 代理)。

当我直接使用我的客户端配置时,没有 HttpProxyHandler(在公司网络外的机器上),它可以工作。但是当我在公司网络内的一台机器上运行代码时,我得到这个错误消息(目标主机匿名):

java.net.ConnectException: Cannot connect to myserver.example.de:443
    at org.apache.camel.component.netty.NettyProducer$ChannelConnectedListener.operationComplete(NettyProducer.java:699)
    at org.apache.camel.component.netty.NettyProducer$ChannelConnectedListener.operationComplete(NettyProducer.java:685)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
    at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
    at io.netty.util.concurrent.DefaultPromise.setFailure(DefaultPromise.java:110)
    at io.netty.channel.DefaultChannelPromise.setFailure(DefaultChannelPromise.java:89)
    at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:214)
    at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:46)
    at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:180)
    at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:166)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
    at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
    at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
    at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:990)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:516)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask$$$capture(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.UnknownHostException: myserver.example.de
    at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
    at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1533)
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1386)
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1307)
    at java.base/java.net.InetAddress.getByName(InetAddress.java:1257)
    at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:156)
    at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:153)
    at java.base/java.security.AccessController.doPrivileged(Native Method)
    at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:153)
    at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:41)
    at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:61)
    at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:53)
    at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:55)
    at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:31)
    at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:106)
    at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:206)
    ... 23 more

这就是我初始化客户端的方式。请注意紧接在 SSLHandler 之后的 HttpProxyHandler,这是我为获得对公司代理的支持所做的。

    @Override
    protected void initChannel(Channel ch) throws Exception {
        // create a new pipeline
        ChannelPipeline pipeline = ch.pipeline();

        SslHandler sslHandler = configureClientSSLOnDemand();
        if (sslHandler != null) {
            LOG.debug("Client SSL handler configured and added as an interceptor against the ChannelPipeline: {}", sslHandler);
            pipeline.addLast("ssl", sslHandler);
        }

        HttpProxyHandler httpProxyHandler = new HttpProxyHandler(
                new InetSocketAddress("myproxy.corporate.de", 3128));
        pipeline.addLast("httpproxy", httpProxyHandler);

        pipeline.addLast("http", new HttpClientCodec());


        List<ChannelHandler> encoders = producer.getConfiguration()
            .getEncodersAsList();
        for (int x = 0; x < encoders.size(); x++) {
            ChannelHandler encoder = encoders.get(x);
            if (encoder instanceof ChannelHandlerFactory) {
                // use the factory to create a new instance of the channel as it may not be shareable
                encoder = ((ChannelHandlerFactory) encoder).newChannelHandler();
            }
            pipeline.addLast("encoder-" + x, encoder);
        }

        List<ChannelHandler> decoders = producer.getConfiguration()
            .getDecodersAsList();
        for (int x = 0; x < decoders.size(); x++) {
            ChannelHandler decoder = decoders.get(x);
            if (decoder instanceof ChannelHandlerFactory) {
                // use the factory to create a new instance of the channel as it may not be shareable
                decoder = ((ChannelHandlerFactory) decoder).newChannelHandler();
            }
            pipeline.addLast("decoder-" + x, decoder);
        }
        if (configuration.isDisableStreamCache()) {
            pipeline.addLast("inbound-streamer", new HttpInboundStreamHandler());
        }
        pipeline.addLast("aggregator", new HttpObjectAggregator(configuration.getChunkedMaxContentLength()));
        pipeline.addLast("outbound-streamer", new HttpOutboundStreamHandler());

        if (producer.getConfiguration()
            .getRequestTimeout() > 0) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Using request timeout {} millis", producer.getConfiguration()
                    .getRequestTimeout());
            }
            ChannelHandler timeout
                = new ReadTimeoutHandler(producer.getConfiguration()
                .getRequestTimeout(), TimeUnit.MILLISECONDS);
            pipeline.addLast("timeout", timeout);
        }

        // handler to route Camel messages
        pipeline.addLast("handler", new HttpClientChannelHandler(producer));
    }

ProxyHandler.handlerAdded 被调用,它又调用 HttpProxyHandler.addCodec。在那里,http 代理处理程序使用 addBefore("httpproxy", null, codecWrapper) 添加其内部类 HttpClientCodecWrapper。 HttpClientCodecWrapper 接收其 handlerAdded 事件并将其传递给包装的 HttpClientCodec。 HttpClientCodec 设置它的 inboundCtx 和 outboundCtx 并通过设置 handlerAdded=true 记录它已经被添加。

然后ProxyHandler.handlerAdded发现ctx.channel().isActive为false,什么都不做

ProxyHandler.channelActive 事件永远不会被触发,ProxyHandler.connect 永远不会被调用。相反,我的 netty 管道告诉我它无法连接到目标主机。

我做错了什么?

apache-camel netty
© www.soinside.com 2019 - 2024. All rights reserved.