计算 Spark Dataframe 中分组轨迹之间的成对距离

问题描述 投票:0回答:0

我有一个巨大的 Spark DataFrame(大约 100.000 行),下面有一个示例 DF。轨迹列是纬度、经度对,并附有以秒为单位的时间戳。

data = [
    (1, "IMO123", "PortA", "PortB", [[1.0, 2.0, 1000], [1.1, 2.1, 1010], [1.2, 2.2, 1020]], 3),
    (2, "IMO124", "PortA", "PortC", [[1.0, 2.0, 1000], , [1.4, 2.4, 1025]], 2),
    (3, "IMO125", "PortA", "PortD", [[1.1, 2.1, 1005], [1.2, 2.2, 1015], [1.3, 2.3, 1025]], 3),
    (4, "IMO126", "PortB", "PortC", [[2.0, 3.0, 1000], [2.1, 3.1, 1010], [2.2, 3.2, 1020]], 3),
    (4, "IMO127", "PortB", "PortF", [[2.0, 3.0, 1000], [-1.2, 1.1, 1010], [-2.2, -0.2, 1020],[-5,-4,1030]], 4),]

pandas_df = pd.DataFrame(data,columns=["voyage_id",'imo','fromporta','toportb','trajectory','trajectory_length'])

    voyage_id   imo fromporta   toportb trajectory  trajectory_length
0   1   IMO123  PortA   PortB   [[1.0, 2.0, 1000], [1.1, 2.1, 1010], [1.2, 2.2...   3
1   2   IMO124  PortA   PortC   [[1.0, 2.0, 1000], [1.3, 2.3, 1015], [1.4, 2.4...   3
2   3   IMO125  PortA   PortD   [[1.1, 2.1, 1005], [1.2, 2.2, 1015], [1.3, 2.3...   3
3   4   IMO126  PortB   PortC   [[2.0, 3.0, 1000], [2.1, 3.1, 1010], [2.2, 3.2...   3
4   4   IMO127  PortB   PortF   [[2.0, 3.0, 1000], [-1.2, 1.1, 1010], [-2.2, -...   4

我想按出发港“fromporta”分组,然后添加一个名为“min_distance”的新特征,这是行轨迹与共享同一出发港的另一条轨迹的最小距离。

此距离应使用

traj-dist
库 (https://github.com/bguillouet/traj-dist/tree/master/traj_dist) 计算。它具有成对距离的功能,
tdist.pdist(traj_list,metric="sspd")
,还可以直接测量两个轨迹之间的距离
tdist.sspd(traj1,traj2)
.

我已经开始在“fromporta”上分组并收集轨迹


grouped_df = (
    df.groupBy("fromporta")
    .agg(
        collect_list("trajectory").alias("trajectories"),
        collect_list("voyage_id").alias("voyage_ids")
    )
)

但是从这里开始,我不确定如何进行,非常感谢帮助。

如果共享相同的 voyage_id,我可以添加不测量行之间距离的功能,因为我最终将航程分成更小的部分(长度为 6 的轨迹变成三行,2、4 和 6)分别作为轨迹长度)。

python pandas apache-spark distance
© www.soinside.com 2019 - 2024. All rights reserved.