按部门ID计数员工并确定拥有最多员工ID的前两个部门

问题描述 投票:0回答:2

首次使用Spark的用户。我为两个csv文件(员工和部门)创建了RDD。我想提供一个输出,该输出按部门ID计算员工人数,并确定员工ID最多的前两个部门名称。 “ deptno”是我的主键,但是我不知道如何将两个文件结合在一起。

雇员文件包含以下几列:[empno,ename,工作,mgr,hiratedate,sal,comm,deptno]

dept文件包含以下列:[deptno,dname,位置]

这是我到目前为止所做的:

`employees_rdd = sc.textFile("/FileStore/tables/Employee.csv")
employees_rdd.take(3)
header_e = employees_rdd.first()
employees1 = employees_rdd.filter(lambda row : row != header_e)
employees1.take(1)`

`dept_rdd = sc.textFile("/FileStore/tables/Dept.csv")
dept_rdd.take(3)
header_d = dept_rdd.first()
dept1 = dept_rdd.filter(lambda row : row != header_d)
dept1.take(1)`

`employees2 = employees1.map(lambda row : row.split(","))
employees_kv = employees2.map(lambda row : (row[7],1))
employees_kv.take(3)`

在下面接收语法错误:

employees_kv.reduceByKey(lambda x,y:x + y).takeOrdered(2,lambda(x,y):-1 * y)

非常感谢您的协助。

apache-spark lambda pyspark apache-spark-sql
2个回答
0
投票

这是我的pyspark代码来执行此操作。我假设读取语句带有定界符“ |”。

from pyspark.sql.functions import *
from pyspark.sql.types import *

emp = spark.read.option("header","true") \
                .option("inferSchema","true") \
                .option("sep","|") \
                .csv("/FileStore/tables/employee.txt")

dept = spark.read.option("header","true") \
                 .option("inferSchema","true") \
                 .option("sep","|") \
                 .option("removeQuotes","true") \
                 .csv("/FileStore/tables/department.txt")

# Employee count by department
empCountByDept = emp.groupBy("deptno") \
                       .agg(count("empno").alias("no_of_employees"))

empCountByDept.show(20,False)

# Top two department names with the most employees 
topTwoDept = empCountByDept.join(dept, empCountByDept.deptno == dept.deptno, "inner") \
                           .orderBy(empCountByDept.no_of_employees.desc()).drop(dept.deptno) \
                           .select("dname","no_of_employees") \
                           .limit(2)
topTwoDept.show(20,False)

结果::

+------+---------------+
|deptno|no_of_employees|
+------+---------------+
|20    |5              |
|10    |3              |
|30    |6              |
+------+---------------+
+----------+---------------+
|dname     |no_of_employees|
+----------+---------------+
|'Sales'   |6              |
|'Research'|5              |
+----------+---------------+

0
投票

我强烈建议您使用数据框/数据集。不仅是因为它们更易于使用和操纵,而且还可以显着提高性能。甚至spark文档也建议相同。

这里是您的数据框代码:-

val spark = SparkSession.builder.master("local[*]").getOrCreate

这将创建SparkSession,它是应用程序的入口点。

现在,让我们阅读您的员工和部门文件。

val employeeDF = spark.read.format("csv").option("header","true").load("/path/to/employee/file")
val deptDF = spark.read.format("csv").option("header","true").load("/path/to/dept/file")

现在,加入非常容易。下面的语句将创建一个数据帧,该数据帧将是employeeDF列上deptDFdeptno之间内部联接的结果

val joinedDF = employeeDF.join(deptDF,Seq("deptno"))

现在,您可以使用joinedDF获得结果。

val countByDept = joinedDF.groupBy($"deptno").count
//countByDept.show to display the results

val top2Dept = joinedDF.groupBy($"dname").count.orderBy($"count".desc).limit(2)
//top2Dept.show to display the results

希望这可以让您开始使用Spark DataFrames和DataSets。

© www.soinside.com 2019 - 2024. All rights reserved.