我有一个巨大的 Spark DataFrame(大约 100.000 行),下面有一个示例 DF。轨迹列是纬度、经度对,并附有以秒为单位的时间戳。
data = [
(1, "IMO123", "PortA", "PortB", [[1.0, 2.0, 1000], [1.1, 2.1, 1010], [1.2, 2.2, 1020]], 3),
(2, "IMO124", "PortA", "PortC", [[1.0, 2.0, 1000], , [1.4, 2.4, 1025]], 2),
(3, "IMO125", "PortA", "PortD", [[1.1, 2.1, 1005], [1.2, 2.2, 1015], [1.3, 2.3, 1025]], 3),
(4, "IMO126", "PortB", "PortC", [[2.0, 3.0, 1000], [2.1, 3.1, 1010], [2.2, 3.2, 1020]], 3),
(4, "IMO127", "PortB", "PortF", [[2.0, 3.0, 1000], [-1.2, 1.1, 1010], [-2.2, -0.2, 1020],[-5,-4,1030]], 4),]
pandas_df = pd.DataFrame(data,columns=["voyage_id",'imo','fromporta','toportb','trajectory','trajectory_length'])
voyage_id imo fromporta toportb trajectory trajectory_length
0 1 IMO123 PortA PortB [[1.0, 2.0, 1000], [1.1, 2.1, 1010], [1.2, 2.2... 3
1 2 IMO124 PortA PortC [[1.0, 2.0, 1000], [1.3, 2.3, 1015], [1.4, 2.4... 3
2 3 IMO125 PortA PortD [[1.1, 2.1, 1005], [1.2, 2.2, 1015], [1.3, 2.3... 3
3 4 IMO126 PortB PortC [[2.0, 3.0, 1000], [2.1, 3.1, 1010], [2.2, 3.2... 3
4 4 IMO127 PortB PortF [[2.0, 3.0, 1000], [-1.2, 1.1, 1010], [-2.2, -... 4
我想按出发港“fromporta”分组,然后添加一个名为“min_distance”的新特征,这是行轨迹与共享同一出发港的另一条轨迹的最小距离。
此距离应使用
traj-dist
库 (https://github.com/bguillouet/traj-dist/tree/master/traj_dist) 计算。它具有成对距离的功能,tdist.pdist(traj_list,metric="sspd")
,还可以直接测量两个轨迹之间的距离tdist.sspd(traj1,traj2)
.
我已经开始在“fromporta”上分组并收集轨迹
grouped_df = (
df.groupBy("fromporta")
.agg(
collect_list("trajectory").alias("trajectories"),
collect_list("voyage_id").alias("voyage_ids")
)
)
但是从这里开始,我不确定如何进行,非常感谢帮助。
如果共享相同的 voyage_id,我可以添加不测量行之间距离的功能,因为我最终将航程分成更小的部分(长度为 6 的轨迹变成三行,2、4 和 6)分别作为轨迹长度)。