我有一个名为grid_df
的数据框,它包含许多矩形坐标。另一个名为trajectory_df
的数据框,包含许多点坐标。
from pyspark.sql import SparkSession
sqlContext = SparkSession.builder.master("local").appName("test").enableHiveSupport().getOrCreate()
data = [(0,0,0,5,5),(1,0,3,5,8)]
grid_df = sqlContext.createDataFrame(data, schema=['grid_id','lat_min','lng_min','lat_max','lng_max'])
data = [(0,1,1,),(1,2,2),(2, 4, 3),(3, 7, 4)]
trajectory_df = sqlContext.createDataFrame(data, schema=['point_id','lng','lat'])
数据显示:
+-------+-------+-------+-------+-------+
|grid_id|lat_min|lng_min|lat_max|lng_max|
+-------+-------+-------+-------+-------+
| 0| 0| 0| 5| 5|
| 1| 0| 3| 5| 8|
+-------+-------+-------+-------+-------+
+--------+---+---+
|point_id|lng|lat|
+--------+---+---+
| 0| 1| 1|
| 1| 2| 2|
| 2| 4| 3|
| 3| 7| 4|
+--------+---+---+
我想为每个坐标点找到包含它的矩形。我希望输出如下。 new_list
表示包含该点的矩形id grid_id
。
+--------+---+---+--------+
|point_id|lng|lat|new_list|
+--------+---+---+--------+
| 0| 1| 1| [0]|
| 1| 2| 2| [0]|
| 2| 4| 3| [0, 1]|
| 3| 7| 4| [1]|
+--------+---+---+--------+
在我的实际数据中,grid_df
有数千条记录,trajectory_df
有数亿条记录。我有很多信息,比如GIS with pySpark : A not-so-easy journey和PySparkGeoAnalysis。但没有为我找到解决方案,因为这些方法要么应用矩形,要么不支持pyspark。
我希望解决方案足够快。提前致谢。
使用join
可能是一项代价高昂的操作,因为你已经提到trajectory_df
有很多要点。但是,计算速度可归咎于spark中的簇大小。
#Logic here is lat should be between lat_min and lat_max. Similar condition for lng too.
>>> grid_df.join(trajectory_df,((trajectory_df.lat>=grid_df.lat_min) & (trajectory_df.lat<=grid_df.lat_max) & (trajectory_df.lng>=grid_df.lng_min) & (trajectory_df.lng<=grid_df.lng_max))).groupBy(trajectory_df.point_id).agg(F.collect_list("grid_id").alias("grid_id")).show()
+--------+-------+
|point_id|grid_id|
+--------+-------+
| 0| [0]|
| 1| [0]|
| 3| [1]|
| 2| [0, 1]|
+--------+-------+
试试吧!