如何知道nifi进程组是否已使用nipyapi完成数据传输?

问题描述 投票:0回答:2

我必须知道数据传输作业(进程组内的流)的状态,无论它是完成,失败还是正在运行。我想使用nipyapi为Web应用程序执行此操作。

我在nifi有一个进程组,里面有我的nifi流。我正在使用nipyapi安排进程组:

nipyapi.canvas.schedule_process_group(id, True)

现在我想使用nipyapi监控进程组的状态。通过状态我特别想知道它是否仍在运行,失败或完成。

apache-nifi
2个回答
1
投票

NiFi实际上没有可以检查完成的工作概念。一旦启动进程组中的所有组件,它们就会无限期地运行,直到有人停止它们为止。

“完成”或“完成”的概念实际上取决于数据流的作用。例如,如果您的第一个处理器是GetFile,那么一旦该处理器运行,它将监视目录中的文件,直到某人停止处理器为止。当处理器运行时,它无法知道是否会有更多文件,或者它是否已经看到将在目录中删除的所有文件。这种知识只有在那些文件放在那里的人/任何人都知道。

要确定故障,您需要在数据流中执行某些操作以捕获故障。大多数处理器都存在故障关系,因此您需要将这些处理器路由到某处并采取一些措施来跟踪故障。


0
投票

我想我找到了解决这个问题的好方法。这就是我解决它的方式。所以我有一个mysql数据库,基本上跟踪所有要传输的文件。数据库表将有2列。一个用于Filename(假设是唯一的)并标记文件是否已被传输(True和False)。 For Nifi Screenshot click here

我们有3款处理器。 First: listSFTP and putMySQL Second: getSFTP and putHDFS Third: listHDFS and putHDFS 第一部分负责列出SFTP中的文件。它获取所有文件并在mysql中添加一行,文件名为'X','False'尚未传输。 insert into NifiTest.Jobs values('${filename}', 0); 第三部分为HD​​FS做同样的事情。它将插入Transferred = True或更新,如果已存在具有相同文件名的行。 insert into NifiTest.Jobs values('${filename}', 1) on duplicate key update TRANSFERRED = 1; 第2部分除了将文件发送到HDFS之外什么都不做。

现在检查数据传输作业何时完成。 您将一起启动整个流程组。当您查询数据库并获得所有Transferred = 1时,这意味着作业已完成。可能会觉得有些情况可能会失败,但是当您仔细考虑所有情况时,您会发现它会处理所有情况。如果我错了或者可以对此解决方案进行一些改进,请告诉我。

© www.soinside.com 2019 - 2024. All rights reserved.