根据spark中的列值分割数据集

问题描述 投票:0回答:3

我正在尝试根据制造商列内容将数据集拆分为不同的数据集。非常慢
请提出一种改进代码的方法,以便它可以更快地执行并减少Java代码的使用。

List<Row> lsts= countsByAge.collectAsList();
                                
for(Row lst:lsts) {
     String man = lst.toString();
     man = man.replaceAll("[\\p{Ps}\\p{Pe}]", "");
     Dataset<Row> DF = src.filter("Manufacturer='" + man + "'");
     DF.show();                                      
}

代码、输入和输出数据集如下所示。

package org.sparkexample;

import org.apache.parquet.filter2.predicate.Operators.Column;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RelationalGroupedDataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;

public class GroupBy {

    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "C:\\winutils");
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
        SQLContext sqlContext = new SQLContext(sc);
        SparkSession spark = SparkSession.builder().appName("split datasets").getOrCreate();
        sc.setLogLevel("ERROR");
                        
        Dataset<Row> src= sqlContext.read()
                    .format("com.databricks.spark.csv")
                    .option("header", "true")
                    .load("sample.csv");
                                    
                          
        Dataset<Row> unq_manf=src.select("Manufacturer").distinct();
        List<Row> lsts= unq_manf.collectAsList();
                        
        for(Row lst:lsts) {
             String man = lst.toString();
             man = man.replaceAll("[\\p{Ps}\\p{Pe}]", "");
             Dataset<Row> DF = src.filter("Manufacturer='" + man + "'");
             DF.show();          
        }
    }

}

输入表

+------+------------+--------------------+---+
|ItemID|Manufacturer|       Category name|UPC|
+------+------------+--------------------+---+
|   804|         ael|Brush & Broom Han...|123|
|   805|         ael|Wheel Brush Parts...|124|
|   813|         ael|      Drivers Gloves|125|
|   632|        west|       Pipe Wrenches|126|
|   804|         bil|     Masonry Brushes|127|
|   497|        west|   Power Tools Other|128|
|   496|        west|   Power Tools Other|129|
|   495|         bil|           Hole Saws|130|
|   499|         bil|    Battery Chargers|131|
|   497|        west|   Power Tools Other|132|
+------+------------+--------------------+---+

输出

+------------+
|Manufacturer|
+------------+
|         ael|
|        west|
|         bil|
+------------+

+------+------------+--------------------+---+
|ItemID|Manufacturer|       Category name|UPC|
+------+------------+--------------------+---+
|   804|         ael|Brush & Broom Han...|123|
|   805|         ael|Wheel Brush Parts...|124|
|   813|         ael|      Drivers Gloves|125|
+------+------------+--------------------+---+

+------+------------+-----------------+---+
|ItemID|Manufacturer|    Category name|UPC|
+------+------------+-----------------+---+
|   632|        west|    Pipe Wrenches|126|
|   497|        west|Power Tools Other|128|
|   496|        west|Power Tools Other|129|
|   497|        west|Power Tools Other|132|
+------+------------+-----------------+---+

+------+------------+----------------+---+
|ItemID|Manufacturer|   Category name|UPC|
+------+------------+----------------+---+
|   804|         bil| Masonry Brushes|127|
|   495|         bil|       Hole Saws|130|
|   499|         bil|Battery Chargers|131|
+------+------------+----------------+---+
java apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0
3个回答
0
投票

在这种情况下你有两个选择:

  1. 首先您必须收集唯一的制造商值,然后映射 在结果数组上:

    val df = Seq(("HP", 1), ("Brother", 2), ("Canon", 3), ("HP", 5)).toDF("k", "v")    
    val brands = df.select("k").distinct.collect.flatMap(_.toSeq)
    val BrandArray = brands.map(brand => df.where($"k" <=> brand))
    BrandArray.foreach { x =>
    x.show()
    println("---------------------------------------")
    }
    
  2. 您还可以根据制造商保存数据框。

    df.write.partitionBy("hour").saveAsTable("parquet")


0
投票

如果您需要经常根据制造商进行查询,最好不要按制造商拆分数据集/数据帧,而是使用制造商作为分区键来编写数据帧

如果您仍然想要基于其中一个列值的单独数据帧,使用 pyspark 和 Spark 2.0+ 的方法之一可能是 -

from pyspark.sql import functions as F

df = spark.read.csv("sample.csv",header=True)

# collect list of manufacturers
manufacturers = df.select('manufacturer').distinct().collect()

# loop through manufacturers to filter df by manufacturers and write it separately 
for m in manufacturers:
    df1 = df.where(F.col('manufacturers')==m[0])
    df1[.repartition(repartition_col)].write.parquet(<write_path>,[write_mode])


0
投票

注意 - 此解决方案位于

scala
中。您可以将其转换为
Java

使用

spark.scheduler.mode
选项
FAIR
 更改为 
conf

spark-shell --conf spark.scheduler.mode=FAIR

spark-submit --conf spark.scheduler.mode=FAIR

给定输入

scala> :paste
// Entering paste mode (ctrl-D to finish)

val df = Seq(
    (804,"ael","Brush & Broom Han","123"),
    (805,"ael","Wheel Brush Parts","124"),
    (813,"ael","Drivers Gloves","125"),
    (632,"west","Pipe Wrenches","126"),
    (804,"bil","Masonry Brushes","127"),
    (497,"west","Power Tools Other","128"),
    (496,"west","Power Tools Other","129"),
    (495,"bil","Hole Saws","130"),
    (499,"bil","Battery Chargers","131"),
    (497,"west","Power Tools Other","132")
)
.toDF("ItemID","Manufacturer"," Category name","UPC")

// Exiting paste mode, now interpreting.

df: org.apache.spark.sql.DataFrame = [ItemID: int, Manufacturer: string ... 2 more fields]

使用并行收集在spark中运行多个作业

scala> import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.ForkJoinTaskSupport
scala> import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ForkJoinPool

收集

manufacturers

的独特值
scala> val manufacturers = df.select("Manufacturer").as[String].collect.distinct.toSeq
manufacturers: Seq[String] = WrappedArray(ael, west, bil)

要处理的线程数

scala> val numThreads = manufacturers.size
numThreads: Int = 3

并行集合

scala> val manufacturersPar = manufacturers.par
manufacturersPar: scala.collection.parallel.ParSeq[String] = ParArray(ael, west, bil)

创建任务支持并传递要运行的线程数。

scala> val taskSupport = new ForkJoinTaskSupport(new ForkJoinPool(numThreads))
taskSupport: scala.collection.parallel.ForkJoinTaskSupport = scala.collection.parallel.ForkJoinTaskSupport@10706411

为并行收集分配任务支持。

scala> manufacturersPar.tasksupport = taskSupport
manufacturersPar.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@10706411

终于运行代码了。首先使用

count
然后使用
show
方法作为 show 方法将不会触发任何作业。

scala> :paste
// Entering paste mode (ctrl-D to finish)

manufacturersPar
.foreach{ f =>
    spark.sparkContext.setLocalProperty("callSite.short", s"Manufacturer=${f}")

    val fdf = df.filter($"Manufacturer" === f)

    println(fdf.count())
    fdf.show(false)
}

// Exiting paste mode, now interpreting.

3
4
3
+------+------------+-----------------+---+
|ItemID|Manufacturer| Category name   |UPC|
+------+------------+-----------------+---+
|804   |ael         |Brush & Broom Han|123|
|805   |ael         |Wheel Brush Parts|124|
|813   |ael         |Drivers Gloves   |125|
+------+------------+-----------------+---+

+------+------------+----------------+---+
|ItemID|Manufacturer| Category name  |UPC|
+------+------------+----------------+---+
|804   |bil         |Masonry Brushes |127|
|495   |bil         |Hole Saws       |130|
|499   |bil         |Battery Chargers|131|
+------+------------+----------------+---+

+------+------------+-----------------+---+
|ItemID|Manufacturer| Category name   |UPC|
+------+------------+-----------------+---+
|632   |west        |Pipe Wrenches    |126|
|497   |west        |Power Tools Other|128|
|496   |west        |Power Tools Other|129|
|497   |west        |Power Tools Other|132|
+------+------------+-----------------+---+


scala>

您可以在

spark ui

中检查多个作业是否已启动

© www.soinside.com 2019 - 2024. All rights reserved.