Apache NiFi - 通道在处理完成之前未注册

问题描述 投票:0回答:0

我有一个非常不清楚和难以理解的问题。在 NiFi 流程中,某些处理器使用服务:

  • DistributedMapCacheClientService
    (1.20.0 版)
  • DistributedMapCacheServer
    (1.20.0 版)

DistributedMapCacheClientService
连接到
DistributedMapCacheServer
.

出于某种原因,我们经常得到:

java.io.IOException: Request invocation failed
    at org.apache.nifi.distributed.cache.client.CacheClientRequestHandler.invoke(CacheClientRequestHandler.java:103)
    at org.apache.nifi.distributed.cache.client.MyDistributedCacheClient.invoke(DistributedCacheClient.java:69)
    at org.apache.nifi.distributed.cache.client.NettyDistributedMapCacheClient.getAndPutIfAbsent(NettyDistributedMapCacheClient.java:140)
    at org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService.getAndPutIfAbsent(DistributedMapCacheClientService.java:154)
    at jdk.internal.reflect.GeneratedMethodAccessor471.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
    at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:105)
    at com.sun.proxy.$Proxy175.getAndPutIfAbsent(Unknown Source)
    at org.apache.nifi.processors.standard.DetectDuplicate.onTrigger(DetectDuplicate.java:184)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1357)
    at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
    at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Channel unregistered before processing completed: [id: 0x7288e0ed, L:0.0.0.0/0.0.0.0:38868]
    at org.apache.nifi.distributed.cache.client.CacheClientRequestHandler.channelUnregistered(CacheClientRequestHandler.java:70)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:219)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:195)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:188)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelUnregistered(DefaultChannelPipeline.java:1388)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:215)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:195)
    at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:821)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:821)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ... 1 more

端口 112233 是一个例子。

这是

DistributedMapCacheClientService
的配置: 主机名和端口都配置好。

在某些情况下,我有一个严重的怀疑,这导致将 FlowFiles 堆积到传入连接/队列中(不确定 100% 这是原因,但很有可能)并阻止该处理器将 FlowFiles 进一步传输到传出连接中。

我已经检查了所有的日志文件,但我无法找到这个问题的根本原因。

有什么帮助吗?

更新:

我们一直在通过从NiFi源连接到远程JVM来调试项目。我们设法发现,

requestHandler
,从下面的来源 (
DistributedCacheClient
),得到
null
.

protected void invoke(final OutboundAdapter outboundAdapter, final InboundAdapter inboundAdapter) throws IOException {
        final Channel channel = channelPool.acquire().syncUninterruptibly().getNow();
        try {
            final CacheClientRequestHandler requestHandler = channel.pipeline().get(CacheClientRequestHandler.class);
            requestHandler.invoke(channel, outboundAdapter, inboundAdapter);
        } finally {
            channelPool.release(channel).syncUninterruptibly();
        }
}

为什么这会是

null
以及在什么情况下?

java netty apache-nifi netty-socketio
© www.soinside.com 2019 - 2024. All rights reserved.