调试复杂NiFi数据流的理想方式

问题描述 投票:0回答:2

根据我在使用NiFi构建一些数据库提取PoC后的理解,整个数据流作为流文件流运行。并且在任何特定时间,执行控制可以同时在一个或多个处理器处。

因此,我对如何为任何故障调试复杂数据流感到困惑。

我的PoC工作流本身就是这样的。 nifi-dataflow

当我们使用生产用例时,它会比这复杂得多。所以我几乎没有问题。

  1. 如何知道数据流的状态。如果让我们说GenerateTableFetch中有4个10分叉流文件因数据库池错误而失败,我怎么知道哪些失败以及如何快速重放它们而不需要数据来源并逐一进行。
  2. 有没有办法通过查看哪个处理器失败的流文件的数据流来了解。

我对使用NiFi调试数据流有很多怀疑/困惑,如果有人可以指点我一些文档或分享最佳实践,那将会有所帮助。

谢谢。

apache-nifi hortonworks-data-platform hortonworks-dataflow
2个回答
4
投票

1-如何了解数据流的状态。如果让我们说GenerateTableFetch中有4个10分叉流文件因数据库池错误而失败,我怎么知道哪些失败以及如何快速重放它们而不进行数据治理并逐个进行。

您可以通过具有类型故障或任何其他类型的关系进行管理,具体取决于您使用的处理器类型发送到进程组来处理错误。

就像布莱恩提到的那样,除非你不在乎,否则你不希望它们自动终止。

2-有没有办法通过查看哪个处理器失败的流文件的数据流来了解。

是 - 您必须设置“公告级别”以分散日志级别

如何管理失败的NiFi流量?

那么你需要成为BuletinBoard的最好的朋友,请看这里SiteToSiteStatusReportingTask 或者你可以使用InvokeHttp来对抗本地NiFI Rest Api,并且需要对http://nifi-server:port/nifi-api/flow/bulletin-board进行GET调用,这将回复一个详细的json对象,可以解析然后推送到PutSlack / PutEmail / PutSNS是否有任何错误。

同样理想的是让Shared Process Group处理任何传入的错误流文件,这个PG将使用规则和路由构建,以应用于您的NiFi服务器中的所有数据流逻辑。具有PG特定属性至关重要,这些属性将随所有流程一起提供,并将在数据流程中使用。

例如:

进程组“Demo”有一个名为Set PG Attributes的处理器,用于设置PGName attibute,PGType属性,FailEmailTitle属性等。如果我的流程在任何时候都失效,则失败关系将根据Set PG Attributes处理器中设置的属性之一的值路由我的失败流程

这是我当前设置的图表,其中我将所有故障发送到同一个共享PG。 enter image description here

其他选择

如果您认为buletin仅持续5分钟是一个问题,那么您可以使用nifi-app.log,可以将其设置为/opt/nifi/conf/logback.xml文件中的规则填充

  <logger name="org.apache.nifi" level="ERROR"/>
    <logger name="org.apache.nifi.processors" level="DEBUG"/>
    <logger name="org.apache.nifi.processors.standard.LogAttribute" level="ERROR"/>
    <logger name="org.apache.nifi.processors.standard.LogMessage" level="ERROR"/>
    <logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="ERROR" />

因此,您可以使用tailFile处理器查看本地日志文件并抓取错误信息或您认为对您有用的内容并从中获得一些意义。


3
投票

每个处理器都应具有一个或多个故障关系。由您决定如何处理故障...在某些情况下,您可以将故障关系路由回同一处理器以继续重试,在其他情况下,您可以将其路由到PutFile处理器并将其写出来到本地磁盘以检查内容,或者您​​可以将其路由到PutEmail处理器以通过电子邮件发送给某人。

你不想做的是自动终止失败关系,因为那时你实际上是在说你想忽略它。

© www.soinside.com 2019 - 2024. All rights reserved.