两个值出现之间的最大间隔 - Spark

问题描述 投票:0回答:1

我有一个以下格式的 Spark 数据框。我想计算返回印度之前从印度到其他国家/地区的最大旅行次数

我创建了一面旗帜来指示印度是航班的始发国还是目的地国。如果是始发航班,则值为 1;如果是目的地航班,则值为 2;否则为 0。

我认为的逻辑是按每个乘客 ID 进行分组,并在 indOriginDestFlg 上执行 collect_list(),这将为我提供每位乘客的完整旅行历史记录。最后,应用 UDF 计算列表中 1 和 2 之间的最大行程间隔。

我想了解是否有使用窗口函数获得相同输出的任何其他最佳方法

乘客ID 航班ID 来自 日期 indOriginDestFlg
4850 433 印度 2017年5月30日 1
4850 445 tj 2017 年 5 月 6 日 0
4850 459 tj 2017年6月12日 0
4850 473 dk 2017年6月18日 0
4850 487 dk ar 2017年6月24日 0
4850 710 ar ir 2017年9月16日 0
4850 730 ir ch 2017年9月21日 0
4850 733 ch 伊尔 2017年9月23日 0
4850 803 伊尔 印度 2017年10月21日 2
4850 897 印度 ch 2017年11月28日 1
4850 920 ch 2017 年 6 月 12 日 0
6613 238 2017年3月26日 0
6613 355 fr 2017 年 4 月 5 日 0
6613 370 fr 印度 2017 年 11 月 5 日 2
6613 597 印度 伊尔 2017 年 1 月 8 日 1
6613 599 伊尔 tk 2/8/2017 0
6613 753 tk 我们 2017年9月29日 0
6613 819 我们 印度 2017年10月27日 2
6613 881 印度 bm 2017年11月22日 1
6613 936 bm 2017年12月12日 0
9030 351 cn bm 2017年2月5日 0
9030 372 bm 2017 年 12 月 5 日 0
9030 431 印度 2017年5月29日 2
9030 433 印度 2017年5月30日 1
9030 448 tk 2017 年 6 月 6 日 0
9030 451 tk PK 2017 年 8 月 6 日 0

上述数据框的预期输出是:

乘客ID 最大旅行次数
4850 7
6613 2
9030 0

如有任何意见,我们将不胜感激。干杯!

apache-spark pyspark apache-spark-sql window-functions
1个回答
0
投票

带范围的窗口函数可用于获取返回日期。 Scala 代码,猜猜,可以轻松翻译成 Python:

// data preparation
val df = Seq(
  (4850, 433, "india", "no", "30/5/2017", 1),
  (4850, 445, "no", "tjv", "5/6/2017", 0),
  (4850, 459, "tj", "no", "12/6/2017", 0),
  (4850, 473, "no", "dk", "18/6/2017", 0),
  (4850, 487, "dk", "ar", "24/6/2017", 0),
  (4850, 710, "ar", "ir", "16/9/2017", 0),
  (4850, 730, "ir", "ch", "21/9/2017", 0),
  (4850, 733, "ch", "il", "23/9/2017", 0),
  (4850, 803, "il", "india", "21/10/2017", 2),
  (4850, 897, "india", "ch", "28/11/2017", 1),
  (4850, 920, "ch", "no", "6/12/2017", 0),
  (6613, 238, "th", "th", "26/3/2017", 0),
  (6613, 355, "th", "fr", "4/5/2017", 0),
  (6613, 370, "fr", "india", "11/5/2017", 2),
  (6613, 597, "india", "il", "1/8/2017", 1),
  (6613, 599, "il", "tk", "2/8/2017", 0),
  (6613, 753, "tk", "us", "29/9/2017", 0),
  (6613, 819, "us", "india", "27/10/2017", 2),
  (6613, 881, "india", "bm", "22/11/2017", 1),
  (6613, 936, "bm", "no", "12/12/2017", 0),
  (9030, 351, "cn", "bm", "2/5/2017", 0),
  (9030, 372, "bm", "no", "12/5/2017", 0),
  (9030, 431, "no", "india", "29/5/2017", 2),
  (9030, 433, "india", "no", "30/5/2017", 1),
  (9030, 448, "no", "tk", "6/6/2017", 0),
  (9030, 451, "tk", "pk", "8/6/2017", 0)
).toDF("passengerId", "flightId", "from", "to", "date", "indOriginDestFlg")
  .withColumn("date", to_date($"date", "d/M/yyyy"))


val backWindow = Window
  .partitionBy("passengerId")
  .orderBy("date")
  .rangeBetween(Window.currentRow, Window.unboundedFollowing)

// action 
val backAndForth = df
  .where($"from" === "india" || $"to" === "india")
  .withColumn("backToIndia", min(when($"to" === "india", $"date").otherwise(null)).over(backWindow))

  .where($"from" === "india")
  .where($"backToIndia".isNotNull)

backAndForth.show(false)

backAndForth.groupBy("passengerId")
  .agg(max(datediff($"backToIndia", $"date")).as("max"))
  .show(false)

输出是:

+-----------+--------+-----+---+----------+----------------+-----------+
|passengerId|flightId|from |to |date      |indOriginDestFlg|backToIndia|
+-----------+--------+-----+---+----------+----------------+-----------+
|6613       |597     |india|il |2017-08-01|1               |2017-10-27 |
|4850       |433     |india|no |2017-05-30|1               |2017-10-21 |
+-----------+--------+-----+---+----------+----------------+-----------+

+-----------+---+
|passengerId|max|
+-----------+---+
|6613       |87 |
|4850       |144|
+-----------+---+
© www.soinside.com 2019 - 2024. All rights reserved.