使用nifi,如何将JSON对象插入Postgres jsonb列?

问题描述 投票:0回答:1

目前我的nifi流程如下:

ListS3 -> RouteOnAttribute -> RouteOnAttribute -> FetchS3Object -> ConvertRecord -> PutDatabaseRecord

我在 s3 存储桶中列出 CSV 文件,然后在两个不同的属性处理器上路由以选择要放入 Postgres 数据库中的文件。然后我从 s3 获取 CSV 文件。然后,我使用 ConvertRecord 处理器将 CSV 文件转换为 json 对象行,如下所示:

{"ideez": "1", "name": "John Doe", "age": "30", "city": "New York"}
{“ideez”:“2”,“姓名”:“简·史密斯”,“年龄”:“25”,“城市”:“洛杉矶”}
...
我想要这样做的原因是因为有许多 CSV 文件具有相似的标题(我路由以使用正确的标题)并且具有大部分相似的列。但有许多文件具有不同的列名称。每天都在变化。因此,将它们作为 json 对象插入到 Postgres 中的 jsonb 列中最适合我。

以下是我的 JsonRecordSetWriter 以及 PutDatabaseRecord 的设置方式:



以下是我收到的一些错误:

  • 无法将记录放入数据库 flowfile[filename=this/that/then/this/filename] 路由到 失败:java.sql.SQLDataException:记录中没有任何字段 映射到 public.test_table 表 NORMALIZED 定义的列 列:ID、JSON 内容
  • putdatabaserecord[id=uuid] 记录没有所需的值 ‘json_content’列

如果我尝试用“json_content”封装每一行:

{"json_content": {"ideez": "1", "name": "John Doe", "age": "30", "city": "New York"}},
{“json_content”:{“ideez”:“2”,“姓名”:“简·史密斯”,“年龄”:“25”,“城市”:“洛杉矶”}},
...

我收到以下错误:

“错误:json 类型的输入语法无效详细信息:令牌“MapRecord”无效。其中:JSONdata,第 1 行:MapRecord…未命名门户参数 $1=‘…’

有没有人有幸做过类似的过程?也许,我应该追求更好的过程。谢谢。

postgresql etl apache-nifi jsonb
1个回答
0
投票

我认为你真的很接近!但问题是你的内容:

{"json_content": {"ideez": "1", "name": "John Doe", "age": "30", "city": "New York"}},
{"json_content": {"ideez": "2", "name": "Jane Smith", "age": "25", "city": "Los Angeles"}}

将 json 内容作为实际的 json 对象,NiFi 将其解释为记录。因此,关于 MapRecord 的错误。您需要“字符串化”该字段。我认为我可以研究一些解决方案,以便使 JSON 字符串化更加方便。但现在,您可以使用原始字符串操作来做到这一点。我会在 PutDatabaseRecord 之前使用 ReplaceText 处理器,并以这种方式配置它:

Replacement Strategy = Regex Replace
Search Value = (^.*$)
Replacement Value = {"json_content": "${'$1':escapeJson():replace('\\', '\\\\')}"}
Character Set = UTF-8
Maximum Buffer Size = 1 MB
Evaluation Mode = Line-by-Line
Line-by-Line Evaluation Mode = All

这应该处理将其转换为字符串并正确转义它。

© www.soinside.com 2019 - 2024. All rights reserved.