我想通过动态生成模式将来自 ElasticSearch
co.elastic.clients.elasticsearch.core.SearchResponse<ObjectNode> response
的响应转换为镶木地板文件...
我尝试使用 Spark 动态创建 parquet 文件,但它占用大量堆空间并且内存效率不高,导致内存越界异常,因为它在读取 JSON 文件和创建时超出了堆空间数据框。
我一直在寻找将 Elastic 响应直接转换为 parquet 文件,在该文件中可以动态生成模式...
我还看到模式是通过读取记录的第一行动态生成的,但我不确定当 JSON 响应或弹性响应具有层次结构或嵌套结构时这是否可行。
这是一个代码块,用于通过 Spark 动态生成 parquet 文件的模式。
public static void main(String[] args) throws AnalysisException {
System.out.println("Hello world!");
SparkSession sparkSession = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate();
// create a data from the source
Dataset<Row> dataframe =
sparkSession
.read()
.json("async.json");
//sparkSession.read().format("co.elastic.clients.elasticsearch.core.SearchResponse<ObjectNode>").load(response);
// save the json as a parquet file, maintaining the schema information
dataframe
.write()
.parquet("async.parquet");
// after the parquet file is created
// read the parquet file
// parquet file is self-describing so the schema is preserved
// the result of loading a parquet file is a also a Dataframe
Dataset<Row> parquetFileDataFrame = sparkSession
.read()
.parquet("async.parquet");
// parquet files are used to create temporary view and then used in SQL statements
parquetFileDataFrame.createTempView("parquetFile");
Dataset<Row> namesDataFrame = sparkSession
.sql("SELECT * FROM parquetFile");
namesDataFrame.show();
}
这是我想将 ElasticSearch 异步滚动的响应转换为可以动态生成模式的镶木地板文件的代码块...
public Future<JsonObject> asyncScroll(File file, String index, Query query, String[] source,
String searchId, ProgressListener progressListener) {
Promise<JsonObject> promise = Promise.promise();
SearchRequest searchRequest = SearchRequest
.of(e -> e.index(index).query(query).size(10000).scroll(scr -> scr.time("5m")));
asyncClient.search(searchRequest, ObjectNode.class).whenCompleteAsync((response, ex) -> {
if (ex != null) {
}
String scrollId=null;
try {
// LOGGER.info("response : {}",response.toString());
scrollId = response.scrollId();
Dataset<Row> dataframe =
sparkSession
.read()
.format("String")
.load(response.toString());
dataframe
.write()
.parquet("async.parquet");
Dataset<Row> parquetFileDataFrame = sparkSession
.read()
.parquet("async.parquet");
// parquet files are used to create temporay view and then used in SQL statements
parquetFileDataFrame
.createTempView("parquetFile");
Dataset<Row> namesDataFrame = sparkSession
.sql("SELECT * FROM parquetFile");
namesDataFrame.show();
非常感谢任何帮助!