pyspark和geodataframe检查点是否包含在多边形中的最快方法

问题描述 投票:0回答:1

通过 pyspark 读取的 CSV 文件包含数万个 GPS 信息(纬度、经度),通过 geodataframe 读取的 Feather 文件包含数百万个多边形信息。

在上一个问题(在 pandas dataframe 中获取特定范围索引的最佳算法)中,我成功创建了一个地理数据框。

但是,经证实,使用从 pyspark 读取数据的 geodataframe 时会消耗大量计算时间。

首先通过pyspark加载数据,如下。

data = spark.read.option("header", True).csv("path/file.csv")

数据包含车辆的逐秒 GPS 信息,如下:

时间 车辆.位置.纬度 车辆.位置.经度
2023-01-01 00:00:00 37.123456 123.123456
2023-01-01 00:00:01 37.123457 123.123457

其次,之前创建的geodataframe数据加载如下

gdf = gpd.read_feather("/path/file.feather")

地理数据框包含如下几何信息:

id MAX_SPD 几何
0 60 多边形 ((126.27306 33.19865, 126.27379 33.198...
1 60 多边形 ((126.27222 33.19865, 126.27306 33.198...

接下来,我在 pyspark 中创建了一个用户定义的函数。

目的是找出包含给定GPS信息的多边形的MAX_SPD值。如果它包含在多个多边形中,则检索 max(MAX_SPD)。

def find_intersection(longitude, latitude):
    if type(longitude) != float or type(latitude) != float:
        return -1
    mgdf = gdf['geometry'].contains(Point(longitude, latitude))
    max_spd = gdf.loc[mgdf, 'MAX_SPD'].max()
    if math.isnan(max_spd):
        max_spd = -1
    return max_spd

find_intersection_udf = udf(find_intersection, IntegerType())
data = data.withColumn("max_spd", find_intersection_udf(col("`Vehicle.Location.Longitude`"), col("`Vehicle.Location.Latitude`")))
data.select("`max_spd`").show()

但是,使用用户定义的函数似乎很耗时。 还有其他减少时间消耗的好主意吗?

python pyspark geopandas
1个回答
0
投票

您可以使用 Apache Sedona 进行地理空间分析。

https://sedona.apache.org/latest-snapshot/tutorial/sql/

我改编了这个笔记本,为您提供了如何执行此操作的示例。 您所要做的就是将 Points CSV 读入 point_rdd,将羽毛文件格式的多边形数据读入 Geopandas 数据帧,然后转换为 Polygon_rdd。进行连接查询并获取结果。结果将包含您的属性,例如

MAX_SPD
,然后您执行聚合查询以检索最大值,即
max(MAX_SPD)
.

笔记本:

https://github.com/apache/sedona/blob/master/binder/ApacheSedonaCore.ipynb

以下脚本中使用的数据可以在此位置找到:

https://github.com/apache/sedona/tree/master/binder/data

要求:

pip install apache-sedona

pip install geopandas

Python 脚本:

import sys

from pyspark import SparkContext, SQLContext
import pyspark.sql.functions as F
import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import geopandas as gpd
import pandas as pd
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import LongType
from shapely.geometry import Point
from shapely.geometry import Polygon

from sedona.spark import *
from sedona.core.geom.envelope import Envelope


config = SedonaContext.builder() .\
    config('spark.jars.packages',
           'org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.4.1,'
           'org.datasyslab:geotools-wrapper:1.4.0-28.2'). \
    config("spark.serializer", KryoSerializer.getName). \
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
    getOrCreate()

sedona = SedonaContext.create(config)

sc = sedona.sparkContext

point_rdd = PointRDD(sc, "../sedona_data/arealm-small.csv", 1, FileDataSplitter.CSV, True, 10, StorageLevel.MEMORY_ONLY, "epsg:4326", "epsg:4326")

## Getting approximate total count
count_approx = point_rdd.approximateTotalCount
print("approximate count", count_approx)

# To run analyze please use function analyze
print("analyse is here", point_rdd.analyze())

# collect to Python list
points_List = point_rdd.rawSpatialRDD.collect()[:5]

print("Printng point list")
print(points_List)

point_rdd_to_geo = point_rdd.rawSpatialRDD.map(lambda x: [x.geom, *x.getUserData().split("\t")])

point_gdf = gpd.GeoDataFrame(
    point_rdd_to_geo.collect(), columns=["geom", "attr1", "attr2", "attr3"], geometry="geom"
)

print("GeoPandas Dataframe")
print(point_gdf[:5])


spatial_df = Adapter.\
    toDf(point_rdd, ["attr1", "attr2", "attr3"], sedona).\
    createOrReplaceTempView("spatial_df")

spatial_gdf = sedona.sql("Select attr1, attr2, attr3, geometry as geom from spatial_df")

spatial_gdf.show(5, False)


## Apache Sedona spatial partitioning method can
# significantly speed up the join query.
# Three spatial partitioning methods are available:
# KDB-Tree, Quad-Tree and R-Tree.
# Two SpatialRDD must be partitioned by the same way.


## Very Important : Choose the Same Spatial Paritioning Everywhere. in this case GridType.KDBTREE
done_value = point_rdd.spatialPartitioning(GridType.KDBTREE)
print("spatial partitioniing done value = ", done_value)

polygon_rdd = PolygonRDD(sc, "../sedona_data/primaryroads-polygon.csv", FileDataSplitter.CSV, True, 11, StorageLevel.MEMORY_ONLY, "epsg:4326", "epsg:4326")
print(polygon_rdd.analyze())
polygon_rdd.spatialPartitioning(point_rdd.getPartitioner())  ### This retrieves the same paritioner i.e. GridType.KDBTREE

# building an index
point_rdd.buildIndex(IndexType.RTREE, True)
polygon_rdd.buildIndex(IndexType.RTREE, True)

# Perform Spatial Join Query
result = JoinQuery.SpatialJoinQueryFlat(point_rdd, polygon_rdd, True, False)

print("Result of the Join Query")
print(result.take(2))


schema = StructType(
    [
        StructField("geom_left", GeometryType(), False),
        StructField("geom_right", GeometryType(), False)
    ]
)

# Set verifySchema to False
spatial_join_result = result.map(lambda x: [x[0].geom, x[1].geom])
sedona_df = sedona.createDataFrame(spatial_join_result, schema, verifySchema=False)

print("Schema of Sedona Dataframe")
print(sedona_df.printSchema())

print("Values here")
sedona_df.show(n=10, truncate=False)

输出:

approximate count 3000
analyse is here True
Printng point list
[Geometry: Point userData: testattribute0   testattribute1  testattribute2, Geometry: Point userData: testattribute0    testattribute1  testattribute2, Geometry: Point userData: testattribute0    testattribute1  testattribute2, Geometry: Point userData: testattribute0    testattribute1  testattribute2, Geometry: Point userData: testattribute0    testattribute1  testattribute2]
GeoPandas Dataframe
                         geom           attr1           attr2           attr3
0  POINT (-88.33149 32.32414)  testattribute0  testattribute1  testattribute2
1  POINT (-88.17593 32.36076)  testattribute0  testattribute1  testattribute2
2  POINT (-88.38895 32.35707)  testattribute0  testattribute1  testattribute2
3  POINT (-88.22110 32.35078)  testattribute0  testattribute1  testattribute2
4  POINT (-88.32399 32.95067)  testattribute0  testattribute1  testattribute2
+--------------+--------------+--------------+----------------------------+
|attr1         |attr2         |attr3         |geom                        |
+--------------+--------------+--------------+----------------------------+
|testattribute0|testattribute1|testattribute2|POINT (-88.331492 32.324142)|
|testattribute0|testattribute1|testattribute2|POINT (-88.175933 32.360763)|
|testattribute0|testattribute1|testattribute2|POINT (-88.388954 32.357073)|
|testattribute0|testattribute1|testattribute2|POINT (-88.221102 32.35078) |
|testattribute0|testattribute1|testattribute2|POINT (-88.323995 32.950671)|
+--------------+--------------+--------------+----------------------------+
only showing top 5 rows

spatial partitioniing done value =  True
True
Result of the Join Query
[[Geometry: Polygon userData: , Geometry: Point userData: testattribute0    testattribute1  testattribute2], [Geometry: Polygon userData: , Geometry: Point userData: testattribute0    testattribute1  testattribute2]]
Schema of Sedona Dataframe
root
 |-- geom_left: geometry (nullable = false)
 |-- geom_right: geometry (nullable = false)

None
Values here
+------------------------------------------------------------------------------------------------------------------------+----------------------------+
|geom_left                                                                                                               |geom_right                  |
+------------------------------------------------------------------------------------------------------------------------+----------------------------+
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.39203 32.507796) |
|POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032))  |POINT (-88.39203 32.507796) |
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.39203 32.507796) |
|POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032))  |POINT (-88.39203 32.507796) |
|POLYGON ((-88.40049 30.474455, -88.40049 30.692167, -88.006968 30.692167, -88.006968 30.474455, -88.40049 30.474455))   |POINT (-88.35457 30.634836) |
|POLYGON ((-88.40049 30.474455, -88.40049 30.692167, -88.006968 30.692167, -88.006968 30.474455, -88.40049 30.474455))   |POINT (-88.289412 30.48934) |
|POLYGON ((-88.40047 30.474213, -88.40047 30.691941, -88.007269 30.691941, -88.007269 30.474213, -88.40047 30.474213))   |POINT (-88.35457 30.634836) |
|POLYGON ((-88.40047 30.474213, -88.40047 30.691941, -88.007269 30.691941, -88.007269 30.474213, -88.40047 30.474213))   |POINT (-88.289412 30.48934) |
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.304259 32.488903)|
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.347036 32.454904)|
+------------------------------------------------------------------------------------------------------------------------+----------------------------+
only showing top 10 rows
© www.soinside.com 2019 - 2024. All rights reserved.