通过 pyspark 读取的 CSV 文件包含数万个 GPS 信息(纬度、经度),通过 geodataframe 读取的 Feather 文件包含数百万个多边形信息。
在上一个问题(在 pandas dataframe 中获取特定范围索引的最佳算法)中,我成功创建了一个地理数据框。
但是,经证实,使用从 pyspark 读取数据的 geodataframe 时会消耗大量计算时间。
首先通过pyspark加载数据,如下。
data = spark.read.option("header", True).csv("path/file.csv")
数据包含车辆的逐秒 GPS 信息,如下:
时间 | 车辆.位置.纬度 | 车辆.位置.经度 |
---|---|---|
2023-01-01 00:00:00 | 37.123456 | 123.123456 |
2023-01-01 00:00:01 | 37.123457 | 123.123457 |
其次,之前创建的geodataframe数据加载如下
gdf = gpd.read_feather("/path/file.feather")
地理数据框包含如下几何信息:
id | MAX_SPD | 几何 |
---|---|---|
0 | 60 | 多边形 ((126.27306 33.19865, 126.27379 33.198... |
1 | 60 | 多边形 ((126.27222 33.19865, 126.27306 33.198... |
接下来,我在 pyspark 中创建了一个用户定义的函数。
目的是找出包含给定GPS信息的多边形的MAX_SPD值。如果它包含在多个多边形中,则检索 max(MAX_SPD)。
def find_intersection(longitude, latitude):
if type(longitude) != float or type(latitude) != float:
return -1
mgdf = gdf['geometry'].contains(Point(longitude, latitude))
max_spd = gdf.loc[mgdf, 'MAX_SPD'].max()
if math.isnan(max_spd):
max_spd = -1
return max_spd
find_intersection_udf = udf(find_intersection, IntegerType())
data = data.withColumn("max_spd", find_intersection_udf(col("`Vehicle.Location.Longitude`"), col("`Vehicle.Location.Latitude`")))
data.select("`max_spd`").show()
但是,使用用户定义的函数似乎很耗时。 还有其他减少时间消耗的好主意吗?
您可以使用 Apache Sedona 进行地理空间分析。
https://sedona.apache.org/latest-snapshot/tutorial/sql/
我改编了这个笔记本,为您提供了如何执行此操作的示例。 您所要做的就是将 Points CSV 读入 point_rdd,将羽毛文件格式的多边形数据读入 Geopandas 数据帧,然后转换为 Polygon_rdd。进行连接查询并获取结果。结果将包含您的属性,例如
MAX_SPD
,然后您执行聚合查询以检索最大值,即 max(MAX_SPD)
.
笔记本:
https://github.com/apache/sedona/blob/master/binder/ApacheSedonaCore.ipynb
以下脚本中使用的数据可以在此位置找到:
https://github.com/apache/sedona/tree/master/binder/data
要求:
pip install apache-sedona
pip install geopandas
Python 脚本:
import sys
from pyspark import SparkContext, SQLContext
import pyspark.sql.functions as F
import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import geopandas as gpd
import pandas as pd
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import LongType
from shapely.geometry import Point
from shapely.geometry import Polygon
from sedona.spark import *
from sedona.core.geom.envelope import Envelope
config = SedonaContext.builder() .\
config('spark.jars.packages',
'org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.4.1,'
'org.datasyslab:geotools-wrapper:1.4.0-28.2'). \
config("spark.serializer", KryoSerializer.getName). \
config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
getOrCreate()
sedona = SedonaContext.create(config)
sc = sedona.sparkContext
point_rdd = PointRDD(sc, "../sedona_data/arealm-small.csv", 1, FileDataSplitter.CSV, True, 10, StorageLevel.MEMORY_ONLY, "epsg:4326", "epsg:4326")
## Getting approximate total count
count_approx = point_rdd.approximateTotalCount
print("approximate count", count_approx)
# To run analyze please use function analyze
print("analyse is here", point_rdd.analyze())
# collect to Python list
points_List = point_rdd.rawSpatialRDD.collect()[:5]
print("Printng point list")
print(points_List)
point_rdd_to_geo = point_rdd.rawSpatialRDD.map(lambda x: [x.geom, *x.getUserData().split("\t")])
point_gdf = gpd.GeoDataFrame(
point_rdd_to_geo.collect(), columns=["geom", "attr1", "attr2", "attr3"], geometry="geom"
)
print("GeoPandas Dataframe")
print(point_gdf[:5])
spatial_df = Adapter.\
toDf(point_rdd, ["attr1", "attr2", "attr3"], sedona).\
createOrReplaceTempView("spatial_df")
spatial_gdf = sedona.sql("Select attr1, attr2, attr3, geometry as geom from spatial_df")
spatial_gdf.show(5, False)
## Apache Sedona spatial partitioning method can
# significantly speed up the join query.
# Three spatial partitioning methods are available:
# KDB-Tree, Quad-Tree and R-Tree.
# Two SpatialRDD must be partitioned by the same way.
## Very Important : Choose the Same Spatial Paritioning Everywhere. in this case GridType.KDBTREE
done_value = point_rdd.spatialPartitioning(GridType.KDBTREE)
print("spatial partitioniing done value = ", done_value)
polygon_rdd = PolygonRDD(sc, "../sedona_data/primaryroads-polygon.csv", FileDataSplitter.CSV, True, 11, StorageLevel.MEMORY_ONLY, "epsg:4326", "epsg:4326")
print(polygon_rdd.analyze())
polygon_rdd.spatialPartitioning(point_rdd.getPartitioner()) ### This retrieves the same paritioner i.e. GridType.KDBTREE
# building an index
point_rdd.buildIndex(IndexType.RTREE, True)
polygon_rdd.buildIndex(IndexType.RTREE, True)
# Perform Spatial Join Query
result = JoinQuery.SpatialJoinQueryFlat(point_rdd, polygon_rdd, True, False)
print("Result of the Join Query")
print(result.take(2))
schema = StructType(
[
StructField("geom_left", GeometryType(), False),
StructField("geom_right", GeometryType(), False)
]
)
# Set verifySchema to False
spatial_join_result = result.map(lambda x: [x[0].geom, x[1].geom])
sedona_df = sedona.createDataFrame(spatial_join_result, schema, verifySchema=False)
print("Schema of Sedona Dataframe")
print(sedona_df.printSchema())
print("Values here")
sedona_df.show(n=10, truncate=False)
输出:
approximate count 3000
analyse is here True
Printng point list
[Geometry: Point userData: testattribute0 testattribute1 testattribute2, Geometry: Point userData: testattribute0 testattribute1 testattribute2, Geometry: Point userData: testattribute0 testattribute1 testattribute2, Geometry: Point userData: testattribute0 testattribute1 testattribute2, Geometry: Point userData: testattribute0 testattribute1 testattribute2]
GeoPandas Dataframe
geom attr1 attr2 attr3
0 POINT (-88.33149 32.32414) testattribute0 testattribute1 testattribute2
1 POINT (-88.17593 32.36076) testattribute0 testattribute1 testattribute2
2 POINT (-88.38895 32.35707) testattribute0 testattribute1 testattribute2
3 POINT (-88.22110 32.35078) testattribute0 testattribute1 testattribute2
4 POINT (-88.32399 32.95067) testattribute0 testattribute1 testattribute2
+--------------+--------------+--------------+----------------------------+
|attr1 |attr2 |attr3 |geom |
+--------------+--------------+--------------+----------------------------+
|testattribute0|testattribute1|testattribute2|POINT (-88.331492 32.324142)|
|testattribute0|testattribute1|testattribute2|POINT (-88.175933 32.360763)|
|testattribute0|testattribute1|testattribute2|POINT (-88.388954 32.357073)|
|testattribute0|testattribute1|testattribute2|POINT (-88.221102 32.35078) |
|testattribute0|testattribute1|testattribute2|POINT (-88.323995 32.950671)|
+--------------+--------------+--------------+----------------------------+
only showing top 5 rows
spatial partitioniing done value = True
True
Result of the Join Query
[[Geometry: Polygon userData: , Geometry: Point userData: testattribute0 testattribute1 testattribute2], [Geometry: Polygon userData: , Geometry: Point userData: testattribute0 testattribute1 testattribute2]]
Schema of Sedona Dataframe
root
|-- geom_left: geometry (nullable = false)
|-- geom_right: geometry (nullable = false)
None
Values here
+------------------------------------------------------------------------------------------------------------------------+----------------------------+
|geom_left |geom_right |
+------------------------------------------------------------------------------------------------------------------------+----------------------------+
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.39203 32.507796) |
|POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032)) |POINT (-88.39203 32.507796) |
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.39203 32.507796) |
|POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032)) |POINT (-88.39203 32.507796) |
|POLYGON ((-88.40049 30.474455, -88.40049 30.692167, -88.006968 30.692167, -88.006968 30.474455, -88.40049 30.474455)) |POINT (-88.35457 30.634836) |
|POLYGON ((-88.40049 30.474455, -88.40049 30.692167, -88.006968 30.692167, -88.006968 30.474455, -88.40049 30.474455)) |POINT (-88.289412 30.48934) |
|POLYGON ((-88.40047 30.474213, -88.40047 30.691941, -88.007269 30.691941, -88.007269 30.474213, -88.40047 30.474213)) |POINT (-88.35457 30.634836) |
|POLYGON ((-88.40047 30.474213, -88.40047 30.691941, -88.007269 30.691941, -88.007269 30.474213, -88.40047 30.474213)) |POINT (-88.289412 30.48934) |
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.304259 32.488903)|
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.347036 32.454904)|
+------------------------------------------------------------------------------------------------------------------------+----------------------------+
only showing top 10 rows