querydatabasetable --> 从源获取大约 2GB 数据。
convertavrotojson ---> 将avro从at数据转换为json格式
putdatabaserecord --> 将所有 2GB(超过 1 千万条记录)摄取到 postgresql。因为队列阈值为 10,000。为了成功 putdatabaserecord,我已将流程文件过期 2 秒。因此,在摄取所有数据并且流文件在到期后为空之后,我需要触发下一个executesql处理器来运行存储过程。那么如何在 putdatabserecord 处理器之后触发下一个处理器。
我使用rest api来检查队列大小是否小于0或大于0。如果小于0则仅调用存储过程。但如何仅在摄取完成后触发invokehttp(rest api)或executesql来调用过程。
如果您使用
Max Rows Per Flow File
参数,则 QueryDatabaseTable 会将以下属性写入结果流文件中:
fragment.identifier - the same result set will have the same value
fragment.count - number of fragments
fragment.index - index of current fragment
在
PutDatabaseRecord
之后,您可以使用大约以下参数将 success
关系重定向到 MergeContent:
Merge Strategy = Defragment
Merge Format = ZIP
当所有片段组合成一个 zip 文件时,这应该等待 - 意味着所有片段均已处理。该处理器的输出就是您的触发器。
如我所见
querydatabasetable -->
convertavrotojson -->
putdatabaserecord -->
mergecontent -->
execute sproc...