如何通过PySpark在Elasticsearch中写入带结构化列的数据框?

问题描述 投票:0回答:1

我想把一个包含struct列的数据框写进Elasticsearch。

df1 = spark.createDataFrame([{"date": "2020.04.10","approach": "test", "outlier_score": 1, "a":"1","b":2},
                       {"date": "2020.04.10","approach": "test", "outlier_score": 0, "a":"2","b":1}],
                       )

df1 = df1.withColumn('details', to_json(struct(
   col('a'),
   col('b')
)))

df1.show(truncate=False)

df1.select('date','approach','outlier_score','details').write.format("org.elasticsearch.spark.sql").option('es.resource', 'outliers').save(mode="append")

结果就变成了

+---+--------+---+----------+-------------+---------------+
|a  |approach|b  |date      |outlier_score|details        |
+---+--------+---+----------+-------------+---------------+
|1  |test    |2  |2020.04.10|1            |{"a":"1","b":2}|
|2  |test    |1  |2020.04.10|0            |{"a":"2","b":1}|
+---+--------+---+----------+-------------+---------------+   

这的确是可行的。但JSON却被逃脱了因此,相应的 细节 字段在Kibana中无法点击。

    {
  "_index": "outliers",
  "_type": "_doc",
  "_id": "NuDSA3IBhHa_VjuWENYR",
  "_version": 1,
  "_score": 0,
  "_source": {
    "date": "2020.04.10",
    "approach": "test",
    "outlier_score": 1,
    "details": "{\"a\":\"1\",\"b\":2}"
  },
  "highlight": {
    "date": [
      "@[email protected]@/kibana-highlighted-field@"
    ]
  }
}

我试着提供 .option("es.input.json", "true")但得到一个异常。

org.elasticsearch.hadoop.rest.EsHadoopRemoteException: mapper_parsing_exception: failed to parse;org.elasticsearch.hadoop.rest.EsHadoopRemoteException: not_x_content_exception: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes

如果我尝试在没有转换为JSON的情况下写入数据,例如:删除了 to_json( 的代码,我又得到一个异常。

org.elasticsearch.hadoop.rest.EsHadoopRemoteException: mapper_parsing_exception: failed to parse field [details] of type [text] in document with id 'TuDWA3IBhHa_VjuWFNmX'. Preview of field's value: '{a=2, b=1}';org.elasticsearch.hadoop.rest.EsHadoopRemoteException: illegal_state_exception: Can't get text on a START_OBJECT at 1:68
    {"index":{}}
{"date":"2020.04.10","approach":"test","outlier_score":0,"details":{"a":"2","b":1}}

所以问题是如何将PySpark的数据框架中的JSON列嵌套到Elasticsearch中,使JSON不会被转义?

elasticsearch pyspark pyspark-dataframes elasticsearch-hadoop elasticsearch-spark
1个回答
0
投票

写数据时不需要转换为JSON(不需要 to_json)实际上应该不会产生异常。问题是映射已经自动为转义的JSON字段创建了。

为了解决这个异常,应该删除或重新创建索引。之后,将自动为作为对象的详细信息字段创建映射。另外,也可以删除所有带有详细信息字段的记录,然后将该字段的映射改为对象类型。

© www.soinside.com 2019 - 2024. All rights reserved.