如何仅在队列变空后才触发下一个处理器

问题描述 投票:0回答:1

querydatabasetable --> 从源获取大约 2GB 数据。

convertavrotojson ---> 将avro从at数据转换为json格式

putdatabaserecord --> 将所有 2GB(超过 1 千万条记录)摄取到 postgresql。因为队列阈值为 10,000。为了成功 putdatabaserecord,我已将流程文件过期 2 秒。因此,在摄取所有数据并且流文件在到期后为空之后,我需要触发下一个executesql处理器来运行存储过程。那么如何在 putdatabserecord 处理器之后触发下一个处理器。

我使用rest api来检查队列大小是否小于0或大于0。如果小于0则仅调用存储过程。但如何仅在摄取完成后触发invokehttp(rest api)或executesql来调用过程。

rest triggers apache-nifi processor
1个回答
0
投票

如果您使用

Max Rows Per Flow File
参数,则 QueryDatabaseTable 会将以下属性写入结果流文件中:

fragment.identifier  - the same result set will have the same value
fragment.count       - number of fragments
fragment.index       - index of current fragment

PutDatabaseRecord
之后,您可以使用大约以下参数将
success
关系重定向到 MergeContent

Merge Strategy = Defragment
Merge Format   = ZIP

当所有片段组合成一个 zip 文件时,这应该等待 - 意味着所有片段均已处理。该处理器的输出就是您的触发器。

如我所见

querydatabasetable -->
convertavrotojson  -->
putdatabaserecord  -->
mergecontent       -->
execute sproc...
© www.soinside.com 2019 - 2024. All rights reserved.