我正在从SFTP站点读取CSV文件并使用Nifi将其加载到mysql数据库。
我有以下工作流程似乎工作正常。在开始数据加载之前,我只需要帮助弄清楚如何截断表。
Nifi流程:
ListSFTP - > FetchSFTP - > InferAvroSchema - > ConvertCSVtoAvro - > ConvertAvrotoJSON - > SplitJSON - > ConvertJSONtoSQL - > PutSQL
这个流似乎工作正常,但每次我运行它,我需要先截断表,然后开始加载。
有人可以帮我提供一些有关如何实现这一目标的信息。或者是否有比我写的更好的流程,请指教。
谢谢,adil
在我进入“插入前截断”部分之前,我建议用PutDatabaseRecord替换从ConvertCSVtoAvro到PutSQL的所有内容。它基本上将“ConvertXtoSQL-> PutSQL”放在一起,并且只是为了执行SQL语句而减少了对所有这些转换的需求。
插入部分之前的截断有点棘手。从NiFi 1.5.0开始(在撰写本文时尚未发布)通过NIFI-4522,您将能够在执行SQL命令时保留流文件内容。这意味着在PutDatabaseRecord之前,您可以将PutSQL的“SQL语句”设置为“TRUNCATE myTable”。 PutSQL将发出TRUNCATE但是传递流文件。
目前我能想到的唯一解决方法是ExecuteScript处理器。如果使用PutDatabaseRecord,您可能必须按名称获取DBCPConnectionPool(有关示例,请参阅my blog post)并自行执行TRUNCATE语句,然后传入传入的流文件。如果使用PutSQL上面的流程,您可以获取传入的流文件,然后只有当fragment.index为零并且您在连接上使用Prioritizer时才写出“TRUNCATE myTable”流文件,然后传输传入流文件。