首次使用Spark的用户。我为两个csv文件(员工和部门)创建了RDD。我想提供一个输出,该输出按部门ID计算员工人数,并确定员工ID最多的前两个部门名称。 “ deptno”是我的主键,但是我不知道如何将两个文件结合在一起。
雇员文件包含以下几列:[empno,ename,工作,mgr,hiratedate,sal,comm,deptno]
dept文件包含以下列:[deptno,dname,位置]
这是我到目前为止所做的:
`employees_rdd = sc.textFile("/FileStore/tables/Employee.csv")
employees_rdd.take(3)
header_e = employees_rdd.first()
employees1 = employees_rdd.filter(lambda row : row != header_e)
employees1.take(1)`
`dept_rdd = sc.textFile("/FileStore/tables/Dept.csv")
dept_rdd.take(3)
header_d = dept_rdd.first()
dept1 = dept_rdd.filter(lambda row : row != header_d)
dept1.take(1)`
`employees2 = employees1.map(lambda row : row.split(","))
employees_kv = employees2.map(lambda row : (row[7],1))
employees_kv.take(3)`
在下面接收语法错误:
非常感谢您的协助。
这是我的pyspark代码来执行此操作。我假设读取语句带有定界符“ |”。
from pyspark.sql.functions import *
from pyspark.sql.types import *
emp = spark.read.option("header","true") \
.option("inferSchema","true") \
.option("sep","|") \
.csv("/FileStore/tables/employee.txt")
dept = spark.read.option("header","true") \
.option("inferSchema","true") \
.option("sep","|") \
.option("removeQuotes","true") \
.csv("/FileStore/tables/department.txt")
# Employee count by department
empCountByDept = emp.groupBy("deptno") \
.agg(count("empno").alias("no_of_employees"))
empCountByDept.show(20,False)
# Top two department names with the most employees
topTwoDept = empCountByDept.join(dept, empCountByDept.deptno == dept.deptno, "inner") \
.orderBy(empCountByDept.no_of_employees.desc()).drop(dept.deptno) \
.select("dname","no_of_employees") \
.limit(2)
topTwoDept.show(20,False)
结果::
+------+---------------+
|deptno|no_of_employees|
+------+---------------+
|20 |5 |
|10 |3 |
|30 |6 |
+------+---------------+
+----------+---------------+
|dname |no_of_employees|
+----------+---------------+
|'Sales' |6 |
|'Research'|5 |
+----------+---------------+
我强烈建议您使用数据框/数据集。不仅是因为它们更易于使用和操纵,而且还可以显着提高性能。甚至spark文档也建议相同。
这里是您的数据框代码:-
val spark = SparkSession.builder.master("local[*]").getOrCreate
这将创建SparkSession
,它是应用程序的入口点。
现在,让我们阅读您的员工和部门文件。
val employeeDF = spark.read.format("csv").option("header","true").load("/path/to/employee/file")
val deptDF = spark.read.format("csv").option("header","true").load("/path/to/dept/file")
现在,加入非常容易。下面的语句将创建一个数据帧,该数据帧将是employeeDF
列上deptDF
和deptno
之间内部联接的结果
val joinedDF = employeeDF.join(deptDF,Seq("deptno"))
现在,您可以使用
joinedDF
获得结果。
val countByDept = joinedDF.groupBy($"deptno").count //countByDept.show to display the results val top2Dept = joinedDF.groupBy($"dname").count.orderBy($"count".desc).limit(2) //top2Dept.show to display the results
希望这可以让您开始使用Spark DataFrames和DataSets。