如何将ElasticSearch结果转换成parquet文件?

问题描述 投票:0回答:0

我想通过动态生成模式将来自 ElasticSearch

co.elastic.clients.elasticsearch.core.SearchResponse<ObjectNode> response
的响应转换为镶木地板文件...

我尝试使用 Spark 动态创建 parquet 文件,但它占用大量堆空间并且内存效率不高,导致内存越界异常,因为它在读取 JSON 文件和创建时超出了堆空间数据框。

我一直在寻找将 Elastic 响应直接转换为 parquet 文件,在该文件中可以动态生成模式...

我还看到模式是通过读取记录的第一行动态生成的,但我不确定当 JSON 响应或弹性响应具有层次结构或嵌套结构时这是否可行。

这是一个代码块,用于通过 Spark 动态生成 parquet 文件的模式。

public static void main(String[] args) throws AnalysisException {
    System.out.println("Hello world!");

    SparkSession sparkSession = SparkSession
            .builder()
            .appName("Java Spark SQL basic example")
            .config("spark.master", "local")
            .getOrCreate();


    // create a data from the source
    Dataset<Row> dataframe =
            sparkSession
                    .read()
                    .json("async.json");

 //sparkSession.read().format("co.elastic.clients.elasticsearch.core.SearchResponse<ObjectNode>").load(response);


    // save the json as a parquet file, maintaining the schema information
    dataframe
            .write()
            .parquet("async.parquet");

    // after the parquet file is created
    // read the parquet file
    // parquet file is self-describing so the schema is preserved
    // the result of loading a parquet file is a also a Dataframe
    Dataset<Row> parquetFileDataFrame = sparkSession
            .read()
            .parquet("async.parquet");

    // parquet files are used to create temporary view and then used in SQL statements
    parquetFileDataFrame.createTempView("parquetFile");

    Dataset<Row> namesDataFrame = sparkSession
            .sql("SELECT * FROM parquetFile");
    namesDataFrame.show();
}

这是我想将 ElasticSearch 异步滚动的响应转换为可以动态生成模式的镶木地板文件的代码块...

public Future<JsonObject> asyncScroll(File file, String index, Query query, String[] source,
  String searchId, ProgressListener progressListener) {
Promise<JsonObject> promise = Promise.promise();

SearchRequest searchRequest = SearchRequest
    .of(e -> e.index(index).query(query).size(10000).scroll(scr -> scr.time("5m")));
asyncClient.search(searchRequest, ObjectNode.class).whenCompleteAsync((response, ex) -> {
  if (ex != null) {

  }
  String scrollId=null;
  try {
    // LOGGER.info("response : {}",response.toString());
    scrollId = response.scrollId();

    Dataset<Row> dataframe =
          sparkSession
            .read()
            .format("String")
            .load(response.toString());
            
    dataframe
            .write()
            .parquet("async.parquet");

    Dataset<Row>  parquetFileDataFrame = sparkSession
            .read()
            .parquet("async.parquet");

    // parquet files are used to create temporay view and then used in SQL statements
    parquetFileDataFrame
            .createTempView("parquetFile");

    Dataset<Row>  namesDataFrame = sparkSession
            .sql("SELECT * FROM parquetFile");
    namesDataFrame.show();

非常感谢任何帮助!

java apache-spark elasticsearch hive vert.x
© www.soinside.com 2019 - 2024. All rights reserved.