从S3中读取Spark中的实木复合地板文件

问题描述 投票:1回答:1

我正在以parquet格式从S3读取数据,然后将此数据作为DataFrame处理。问题是如何有效地迭代DataFrame中的行?我知道方法collect将数据加载到内存中,因此,尽管我的DataFrame并不大,但我还是希望避免将完整的数据集加载到内存中。如何优化给定的代码?另外,我正在使用索引来访问DataFrame中的列。我可以按列名访问它们(我知道它们)吗?

DataFrame parquetFile = sqlContext.read().parquet("s3n://"+this.aws_bucket+"/"+this.aws_key_members);
parquetFile.registerTempTable("mydata");
DataFrame eventsRaw = sqlContext.sql("SELECT * FROM mydata");
Row[] rddRows = eventsRaw.collect();
for (int rowIdx = 0; rowIdx < rddRows.length; ++rowIdx)
{
   Map<String, String> props = new HashMap<>();
   props.put("field1", rddRows[rowIdx].get(0).toString());
   props.put("field2", rddRows[rowIdx].get(1).toString());
   // further processing
}
java apache-spark amazon-s3 spark-dataframe
1个回答
0
投票

您可以在spark中使用Map功能。您可以迭代整个数据框而无需收集数据集/数据框。

Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age 
BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map((MapFunction<Row, String>) row -> "Name:" + row.getString(0),Encoders.STRING());

namesDS.show();

如果正在执行的操作很复杂,则可以创建一个地图函数。

 // Map function
 Row doSomething(Row row){
   // get column
     String field = row.getAs(COLUMN)
// construct a new row and add all the existing/modified columns in the row .  
return row.
    }

现在可以将此映射函数调用为数据框的映射函数

StructType structType = dataset.schema();
namesDF.map((MapFunction<Row, Row>)dosomething,
        RowEncoder.apply(structType))

来源:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

© www.soinside.com 2019 - 2024. All rights reserved.