Input Data :
{"id":"1",
"a":"abc",
"b":"bcd",
"c":"cde",
"payload":"Map("path_1/file1":"{"a":"abc", "x","xyz"}", "path_1/file2":"{"b":"abc", "y","xyz"}", "path_2/file3":"{"c":"abc", "z","xyz"}", "path_2/file4":"{"d":"abc", "zz","xyz"}", "path_2/wrong_json5":""d":"abc" "zz","xyz"}")",
"d":"def"}
Output in Data DataFrame :
+------------------------------------------------------------------------+
|id |a |b |c | d |file_name | file_path | data |
+------------------------------------------------------------------------+
1. |1 |abc |bcd |cde |def | file1 | path_1 |{"a":"abc"}{"x":"xyz"} |
|------------------------------------------------------------------------|
2. |1 |abc |bcd |cde |def | file2 | path_1 |{"b":"abc"}{"y","xyz"} |
|------------------------------------------------------------------------|
3. |1 |abc |bcd |cde |def | file3 | path_2 |{"c":"abc"}{"z","xyz"} |
|------------------------------------------------------------------------|
4. |1 |abc |bcd |cde |def | file4 | path_2 |{"d":"abc"}{"zz","xyz"} |
+------------------------------------------------------------------------+
Error Detail DataFrame
+------------------------------------------------------------------------+
|id |a |b |c | d |file_name | file_path | data |
+------------------------------------------------------------------------+
1. |1 |abc |bcd |cde |def |wrong_json5| path_2 | invalid json |
+------------------------------------------------------------------------+
[如何在Scala中解析带有其Meta细节的Map中的所有json,并在另一个DataFrame中捕获错误消息,并在Streaming作业中使用下一个id继续Spark作业。请帮助从上述给定输入中获取上述输出。
您可以使用PERMISSIVE
模式获取损坏的记录:https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html
当遇到损坏的记录时,将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中,并将其他字段设置为null