如何使用java api在Apache Spark Dataset中使用desc进行排序?

问题描述 投票:3回答:3

我正在使用spark会话读取文件,然后拆分单词并计算单词的迭代次数。我需要以desc顺序显示数据

SparkSession sparkSession = SparkSession
            .builder()
            .appName("Java Spark SQL basic example")
            .config("spark.master", "local")
            .getOrCreate();

JavaRDD<Word> textFile = sparkSession
            .read()
            .textFile("/Users/myname/Documents/README.txt")
            .javaRDD()
            .flatMap(s -> Arrays.asList(s.split("[\\s.]")).iterator())
            .map(w -> {
                Word word = new Word();
                word.setWord(w.replace(",", ""));
                return word;
            });

    Dataset<Row> df = sparkSession.createDataFrame(textFile, Word.class);
    df.groupBy("word").count().orderBy(org.apache.spark.sql.functions.col("count").desc()).show();

当我使用org.apache.spark.sql.functions.col("count")它工作正常但不能按照https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/functions.html#desc(java.lang.String)中的定义

df.sort(asc("dept"), desc("age"))

How to sort by column in descending order in Spark SQL?也没用。我想这是斯卡拉。 Java中的等价物是什么?

java apache-spark apache-spark-sql apache-spark-dataset
3个回答
9
投票

在Java中,您必须以这种方式导入包:

import static org.apache.spark.sql.functions.*

2
投票

您的代码应该按照Spark Java文档工作。您尚未发布import语句。如果你没有import functions。由于desc()asc()函数属于functions类。所以你需要使用org.apache.spark.sql.functionsasc("dept"), org.apache.spark.sql.functionsdesc("age")

或者import org.apache.spark.sql.functions.*


0
投票

我用spark 2.4.0

  1. 将下一个键设置为false:spark.kryo.registrationRequired

要么

  1. 感冒:


    kryo.register(org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.SortOrder[].class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.SortOrder.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.BoundReference.class);
            kryo.register(org.apache.spark.sql.catalyst.trees.Origin.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.NullsFirst$.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.Descending$.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.NullsLast$.class);

     kryo.register(Class.forName("scala.math.Ordering$$anon$4"));
                kryo.register(Class.forName("scala.reflect.ClassTag$$anon$1"));
                kryo.register(Class.forName("org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$8"));
                kryo.register(Class.forName("org.apache.spark.sql.catalyst.expressions.Ascending$"));

private static SparkSession session;

    public static void main(String[] args) {
        /* DUMMY DATA creation */
        List<Person> personsList = Arrays.asList(
            new Person(1, "[email protected]", "nom_1"),
            new Person(2, "[email protected]", "nom_2"), 
            new Person(3, "[email protected]", "nom_3"),
            new Person(4, "[email protected]", "nom_4")
        );

        List<Profession> professionList = Arrays.asList(
            new Profession(1, 2, "profession_4"),
            new Profession(2, 1, "profession_2"), 
            new Profession(3, 1, "profession_5"),
            new Profession(4, 2, "profession_2"), 
            new Profession(5, 2, "profession_5"),
            new Profession(6, 3, "profession_7"), 
            new Profession(7, 3, "profession_2"),
            new Profession(8, 4, "profession_2"), 
            new Profession(9, 4, "profession_7")
        );

        // SparkAppConfiguration.load(args);
        // LaunchArgsEncoder launchArgs = SparkAppConfiguration.getLaunchArgs();

        // Initialisation de la session
        session = SparkUtils.initSession("test jointure");

        /* Convert from Java list to Spark Dataset */
        Dataset<Row> rowPerson = session.createDataFrame(personsList, Person.class);
        System.out.println("rowPerson.show();");
        rowPerson.show();

        Dataset<Row> personRenamed = rowPerson.withColumnRenamed("id", "personId");
        System.out.println("personRenamed.show();");
        personRenamed.show();

        Dataset<Row> rowProfession = session.createDataFrame(professionList, Profession.class);
        System.out.println("rowProfession.show();");
        rowProfession.show();

        Dataset<Row> professionRenamed = rowProfession.withColumnRenamed("personId", "personFk");
        System.out.println("professionRenamed.show();");
        professionRenamed.show();


        /* INNER JOIN IN Spark Java */
        Dataset<Row> innerJoinData = personRenamed.join(professionRenamed,
                    personRenamed.col("personId").equalTo(professionRenamed.col("personFk")));

        System.out.println("innerJoinData.show();");
        innerJoinData.show();

        Dataset<Jointure> joinResult = innerJoinData.select("personId", "nom", "courriel", "id", "profession")
                                                        .orderBy(org.apache.spark.sql.functions.col("personId").asc()) 
                                                        .as(Encoders.bean(Jointure.class));
        System.out.println("joinResult.show();");
        joinResult.show();
        System.out.println("joinResult.printSchema();");
        joinResult.printSchema();

        System.exit(0);

 }


 public class Person implements Serializable{

    /**
     * 
     */
    private static final long serialVersionUID = 7327130742162877288L;
    private long personId;
    private String nom;
    private String prenom;
    private String courriel;
    private String profession;
    private String ville;

    public Person(long personId, String nom, String prenom, String courriel, String profession, String ville) {
        super();
        this.personId = personId;
        this.nom = nom;
        this.prenom = prenom;
        this.courriel = courriel;
        this.profession = profession;
        this.ville = ville;
    }

    public Person() {
        super();
    }
   //getter and setter
  }

   public class Profession implements Serializable {

/**
 * 
 */
private static final long serialVersionUID = 7845266779357094461L;

private long id;
private long personId;
private String profession;

public Profession(long id, long personId, String profession) {
    super();
    this.id = id;
    this.personId = personId;
    this.profession = profession;
}

public Profession() {
    super();
}
    //getter and setter
   }

   public class Jointure implements Serializable {


    /**
     * 
     */
    private static final long serialVersionUID = 4341834876589947018L;

    private long id; 
    private String nom;
    private String prenom;
    private String courriel; 
    private String profession;

    public Jointure(long id, String nom, String prenom, String courriel,   String profession) {
        super();
        this.id = id;
        this.nom = nom;
        this.prenom = prenom;
        this.courriel = courriel;
        this.profession = profession;
    }

    public Jointure() {
        super();
    }
    //getter and setter
   }
    rowPerson.show();
    +--------------------+---+-----+
    |            courriel| id|  nom|
    +--------------------+---+-----+
    |[email protected]|  1|nom_1|
    |[email protected]|  2|nom_2|
    |[email protected]|  3|nom_3|
    |[email protected]|  4|nom_4|
    +--------------------+---+-----+

    personRenamed.show();
    +--------------------+--------+-----+
    |            courriel|personId|  nom|
    +--------------------+--------+-----+
    |[email protected]|       1|nom_1|
    |[email protected]|       2|nom_2|
    |[email protected]|       3|nom_3|
    |[email protected]|       4|nom_4|
    +--------------------+--------+-----+

    rowProfession.show();
    +---+--------+------------+
    | id|personId|  profession|
    +---+--------+------------+
    |  1|       2|profession_4|
    |  2|       1|profession_2|
    |  3|       1|profession_5|
    |  4|       2|profession_2|
    |  5|       2|profession_5|
    |  6|       3|profession_7|
    |  7|       3|profession_2|
    |  8|       4|profession_2|
    |  9|       4|profession_7|
    +---+--------+------------+

    professionRenamed.show();
    +---+--------+------------+
    | id|personFk|  profession|
    +---+--------+------------+
    |  1|       2|profession_4|
    |  2|       1|profession_2|
    |  3|       1|profession_5|
    |  4|       2|profession_2|
    |  5|       2|profession_5|
    |  6|       3|profession_7|
    |  7|       3|profession_2|
    |  8|       4|profession_2|
    |  9|       4|profession_7|
    +---+--------+------------+

    innerJoinData.show();
    +--------------------+--------+-----+---+--------+------------+
    |            courriel|personId|  nom| id|personFk|  profession|
    +--------------------+--------+-----+---+--------+------------+
    |[email protected]|       2|nom_2|  1|       2|profession_4|
    |[email protected]|       1|nom_1|  2|       1|profession_2|
    |[email protected]|       1|nom_1|  3|       1|profession_5|
    |[email protected]|       2|nom_2|  4|       2|profession_2|
    |[email protected]|       2|nom_2|  5|       2|profession_5|
    |[email protected]|       3|nom_3|  6|       3|profession_7|
    |[email protected]|       3|nom_3|  7|       3|profession_2|
    |[email protected]|       4|nom_4|  8|       4|profession_2|
    |[email protected]|       4|nom_4|  9|       4|profession_7|
    +--------------------+--------+-----+---+--------+------------+

    joinResult.show();
    +--------+-----+--------------------+---+------------+
    |personId|  nom|            courriel| id|  profession|
    +--------+-----+--------------------+---+------------+
    |       1|nom_1|[email protected]|  3|profession_5|
    |       1|nom_1|[email protected]|  2|profession_2|
    |       2|nom_2|[email protected]|  4|profession_2|
    |       2|nom_2|[email protected]|  5|profession_5|
    |       2|nom_2|[email protected]|  1|profession_4|
    |       3|nom_3|[email protected]|  7|profession_2|
    |       3|nom_3|[email protected]|  6|profession_7|
    |       4|nom_4|[email protected]|  8|profession_2|
    |       4|nom_4|[email protected]|  9|profession_7|
    +--------+-----+--------------------+---+------------+

    joinResult.printSchema();
    root
     |-- personId: long (nullable = false)
     |-- nom: string (nullable = true)
     |-- courriel: string (nullable = true)
     |-- id: long (nullable = false)
     |-- profession: string (nullable = true)




© www.soinside.com 2019 - 2024. All rights reserved.