我有一个以下格式的 Spark 数据框。我想计算返回印度之前从印度到其他国家/地区的最大旅行次数。
我创建了一面旗帜来指示印度是航班的始发国还是目的地国。如果是始发航班,则值为 1;如果是目的地航班,则值为 2;否则为 0。
我认为的逻辑是按每个乘客 ID 进行分组,并在 indOriginDestFlg 上执行 collect_list(),这将为我提供每位乘客的完整旅行历史记录。最后,应用 UDF 计算列表中 1 和 2 之间的最大行程间隔。
我想了解是否有使用窗口函数获得相同输出的任何其他最佳方法。
乘客ID | 航班ID | 来自 | 到 | 日期 | indOriginDestFlg |
---|---|---|---|---|---|
4850 | 433 | 印度 | 不 | 2017年5月30日 | 1 |
4850 | 445 | 不 | tj | 2017 年 5 月 6 日 | 0 |
4850 | 459 | tj | 不 | 2017年6月12日 | 0 |
4850 | 473 | 不 | dk | 2017年6月18日 | 0 |
4850 | 487 | dk | ar | 2017年6月24日 | 0 |
4850 | 710 | ar | ir | 2017年9月16日 | 0 |
4850 | 730 | ir | ch | 2017年9月21日 | 0 |
4850 | 733 | ch | 伊尔 | 2017年9月23日 | 0 |
4850 | 803 | 伊尔 | 印度 | 2017年10月21日 | 2 |
4850 | 897 | 印度 | ch | 2017年11月28日 | 1 |
4850 | 920 | ch | 不 | 2017 年 6 月 12 日 | 0 |
6613 | 238 | 第 | 第 | 2017年3月26日 | 0 |
6613 | 355 | 第 | fr | 2017 年 4 月 5 日 | 0 |
6613 | 370 | fr | 印度 | 2017 年 11 月 5 日 | 2 |
6613 | 597 | 印度 | 伊尔 | 2017 年 1 月 8 日 | 1 |
6613 | 599 | 伊尔 | tk | 2/8/2017 | 0 |
6613 | 753 | tk | 我们 | 2017年9月29日 | 0 |
6613 | 819 | 我们 | 印度 | 2017年10月27日 | 2 |
6613 | 881 | 印度 | bm | 2017年11月22日 | 1 |
6613 | 936 | bm | 不 | 2017年12月12日 | 0 |
9030 | 351 | cn | bm | 2017年2月5日 | 0 |
9030 | 372 | bm | 不 | 2017 年 12 月 5 日 | 0 |
9030 | 431 | 不 | 印度 | 2017年5月29日 | 2 |
9030 | 433 | 印度 | 不 | 2017年5月30日 | 1 |
9030 | 448 | 不 | tk | 2017 年 6 月 6 日 | 0 |
9030 | 451 | tk | PK | 2017 年 8 月 6 日 | 0 |
上述数据框的预期输出是:
乘客ID | 最大旅行次数 |
---|---|
4850 | 7 |
6613 | 2 |
9030 | 0 |
如有任何意见,我们将不胜感激。干杯!
带范围的窗口函数可用于获取返回日期。 Scala 代码,猜猜,可以轻松翻译成 Python:
// data preparation
val df = Seq(
(4850, 433, "india", "no", "30/5/2017", 1),
(4850, 445, "no", "tjv", "5/6/2017", 0),
(4850, 459, "tj", "no", "12/6/2017", 0),
(4850, 473, "no", "dk", "18/6/2017", 0),
(4850, 487, "dk", "ar", "24/6/2017", 0),
(4850, 710, "ar", "ir", "16/9/2017", 0),
(4850, 730, "ir", "ch", "21/9/2017", 0),
(4850, 733, "ch", "il", "23/9/2017", 0),
(4850, 803, "il", "india", "21/10/2017", 2),
(4850, 897, "india", "ch", "28/11/2017", 1),
(4850, 920, "ch", "no", "6/12/2017", 0),
(6613, 238, "th", "th", "26/3/2017", 0),
(6613, 355, "th", "fr", "4/5/2017", 0),
(6613, 370, "fr", "india", "11/5/2017", 2),
(6613, 597, "india", "il", "1/8/2017", 1),
(6613, 599, "il", "tk", "2/8/2017", 0),
(6613, 753, "tk", "us", "29/9/2017", 0),
(6613, 819, "us", "india", "27/10/2017", 2),
(6613, 881, "india", "bm", "22/11/2017", 1),
(6613, 936, "bm", "no", "12/12/2017", 0),
(9030, 351, "cn", "bm", "2/5/2017", 0),
(9030, 372, "bm", "no", "12/5/2017", 0),
(9030, 431, "no", "india", "29/5/2017", 2),
(9030, 433, "india", "no", "30/5/2017", 1),
(9030, 448, "no", "tk", "6/6/2017", 0),
(9030, 451, "tk", "pk", "8/6/2017", 0)
).toDF("passengerId", "flightId", "from", "to", "date", "indOriginDestFlg")
.withColumn("date", to_date($"date", "d/M/yyyy"))
val backWindow = Window
.partitionBy("passengerId")
.orderBy("date")
.rangeBetween(Window.currentRow, Window.unboundedFollowing)
// action
val backAndForth = df
.where($"from" === "india" || $"to" === "india")
.withColumn("backToIndia", min(when($"to" === "india", $"date").otherwise(null)).over(backWindow))
.where($"from" === "india")
.where($"backToIndia".isNotNull)
backAndForth.show(false)
backAndForth.groupBy("passengerId")
.agg(max(datediff($"backToIndia", $"date")).as("max"))
.show(false)
输出是:
+-----------+--------+-----+---+----------+----------------+-----------+
|passengerId|flightId|from |to |date |indOriginDestFlg|backToIndia|
+-----------+--------+-----+---+----------+----------------+-----------+
|6613 |597 |india|il |2017-08-01|1 |2017-10-27 |
|4850 |433 |india|no |2017-05-30|1 |2017-10-21 |
+-----------+--------+-----+---+----------+----------------+-----------+
+-----------+---+
|passengerId|max|
+-----------+---+
|6613 |87 |
|4850 |144|
+-----------+---+