我在星火框架新的,需要一些帮助!
假设第一个数据框(df1
)存储用户访问呼叫中心的时间。
+---------+-------------------+
|USER_NAME| REQUEST_DATE|
+---------+-------------------+
| Mark|2018-02-20 00:00:00|
| Alex|2018-03-01 00:00:00|
| Bob|2018-03-01 00:00:00|
| Mark|2018-07-01 00:00:00|
| Kate|2018-07-01 00:00:00|
+---------+-------------------+
第二个数据帧存储有关一个人是否是该组织的成员信息。 OUT意味着用户离开组织。 IN
意味着用户已经到了该组织。 START_DATE
和END_DATE
意味着对应过程的开始和结束。
例如,你可以看到,Alex
留在2018-01-01 00:00:00
组织,而这个过程在2018-02-01 00:00:00
结束。你可以看到,一个用户可以来在不同的时间为Mark
离开组织。
+---------+---------------------+---------------------+--------+
|NAME | START_DATE | END_DATE | STATUS |
+---------+---------------------+---------------------+--------+
| Alex| 2018-01-01 00:00:00 | 2018-02-01 00:00:00 | OUT |
| Bob| 2018-02-01 00:00:00 | 2018-02-05 00:00:00 | IN |
| Mark| 2018-02-01 00:00:00 | 2018-03-01 00:00:00 | IN |
| Mark| 2018-05-01 00:00:00 | 2018-08-01 00:00:00 | OUT |
| Meggy| 2018-02-01 00:00:00 | 2018-02-01 00:00:00 | OUT |
+----------+--------------------+---------------------+--------+
我试图让在决赛这样的数据帧。它必须包含从第一个数据帧中的所有记录加列说明此人是否是在请求(REQUEST_DATE
)或没有时间的组织的成员。
+---------+-------------------+----------------+
|USER_NAME| REQUEST_DATE| USER_STATUS |
+---------+-------------------+----------------+
| Mark|2018-02-20 00:00:00| Our user |
| Alex|2018-03-01 00:00:00| Not our user |
| Bob|2018-03-01 00:00:00| Our user |
| Mark|2018-07-01 00:00:00| Our user |
| Kate|2018-07-01 00:00:00| No Information |
+---------+-------------------+----------------+
我想下面的代码,但在finalDF
我有错误:
org.apache.spark.SparkException: Task not serializable
此外,在最终的结果我需要的日期时间。现在在lastRowByRequestId
我只有无时间日期。
码:
val df1 = Seq(
("Mark", "2018-02-20 00:00:00"),
("Alex", "2018-03-01 00:00:00"),
("Bob", "2018-03-01 00:00:00"),
("Mark", "2018-07-01 00:00:00"),
("Kate", "2018-07-01 00:00:00")
).toDF("USER_NAME", "REQUEST_DATE")
df1.show()
val df2 = Seq(
("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
).toDF("NAME", "START_DATE", "END_DATE", "STATUS")
df2.show()
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
case class UserAndRequest(
USER_NAME:String,
REQUEST_DATE:java.sql.Date,
START_DATE:java.sql.Date,
END_DATE:java.sql.Date,
STATUS:String,
REQUEST_ID:Long
)
val joined : Dataset[UserAndRequest] = df1.withColumn("REQUEST_ID", monotonically_increasing_id).
join(df2,$"USER_NAME" === $"NAME", "left").
as[UserAndRequest]
val lastRowByRequestId = joined.
groupByKey(_.REQUEST_ID).
reduceGroups( (x,y) =>
if (x.REQUEST_DATE.getTime > x.END_DATE.getTime && x.END_DATE.getTime > y.END_DATE.getTime) x else y
).map(_._2)
def logic(status: String): String = {
if (status == "IN") "Our user"
else if (status == "OUT") "not our user"
else "No Information"
}
val logicUDF = udf(logic _)
val finalDF = lastRowByRequestId.withColumn("USER_STATUS",logicUDF($"REQUEST_DATE"))
我检查你的代码并运行它。它的工作原理与次要更新。我换成REQUEST_DATE按状态。此外,请注意:火花不序列化的任务大多数情况下发生了,当你不使用的情况下阶层,而是从星火2.X case类是Spark任务自动编码。
val finalDF = lastRowByRequestId.withColumn("USER_STATUS",logicUDF($"STATUS"))
下面是输出
+---------+------------+----------+----------+------+----------+--------------+
|USER_NAME|REQUEST_DATE|START_DATE| END_DATE|STATUS|REQUEST_ID| USER_STATUS|
+---------+------------+----------+----------+------+----------+--------------+
| Mark| 2018-02-20|2018-02-01|2018-03-01| IN| 0| Our user|
| Alex| 2018-03-01|2018-01-01|2018-02-01| OUT| 1| not our user|
| Mark| 2018-07-01|2018-02-01|2018-03-01| IN| 3| Our user|
| Bob| 2018-03-01|2018-02-01|2018-02-05| IN| 2| Our user|
| Kate| 2018-07-01| null| null| null| 4|No Information|
+---------+------------+----------+----------+------+----------+--------------+