背景
对于伊利诺伊州的所有道路,我有超过 500,000 个道路对象,这些道路具有线的 Geoshape 属性。我另外还有一组针对全州各点的对象。
需要
我想向点对象类型的支持数据集中添加一列,用于表示距每个点最近的道路的 ID。大多数道路都在点的 50m 范围内,因此这一事实可以帮助优化所选择的任何方法。
之前的尝试
我尝试使用 Palantir 原生地理空间工具库的 DataFrame.knn_join() 方法部分。然而,测试表明它显然无法找到最接近点的线。只有找到距离某个点最近的点才有效。也需要很长时间。
我还尝试执行 DataFrame.distance_join() 但是它返回一定距离内的所有对象,而我只想要最近的对象。我想我可以得到这些点 50 m 以内的所有道路,然后计算每个结果与该点之间的距离并找到最小值,但这似乎有点矫枉过正,它将排除超过 50 m 以外的道路。
我最终想到使用另一个库而不是地理空间工具来完成我想要的事情。我询问 ChatGPT 如何做到这一点,它提出了使用 GeoSpark 的部分代码:
from transforms.api import transform_df, Input, Output
from geospatial_tools import geospatial
from geospark.register import GeoSparkRegistrator
from geospark.core.spatialOperator import JoinQuery
@geospatial()
@transform_df(
Output("ri.foundry.main.dataset.46a58ef8-732f-4bad-9b19-8e3aab9f5d30"),
roads=Input("ri.foundry.main.dataset.32ea817c-1f13-4295-b0a1-345ca38e64d2"),
points=Input("ri.foundry.main.dataset.e0530819-d744-49ac-9e39-91bacd41d199")
)
def compute(ctx, roads, points):
GeoSparkRegistrator.registerAll(ctx.spark_session)
joined_df = JoinQuery.SpatialJoinQuery(points, roads, True, False)
return joined_df
但是,当我运行此命令时,我收到此错误:
Java classpath reference error
A Python dependency you are using is attempting to reference a Java jar not in the classpath. Please check recently added Python dependencies, and add a dependency on the necessary Java packages (JARs) in the build.gradle file.
/transforms-python/src/myproject/datasets/nearest-road.py
GeoSparkRegistrator.registerAll(ctx.spark_session)
我不知道如何解决这个问题。
让我知道另一个解决方案或如何修复此代码!
要回答您的问题,您可能需要编写一些自定义内容来实现您的用例。您的数据规模是多少?您可以在本地编写一些内容而不会遇到内存问题,还是需要分布式计算?
基本上你想在线上创建一个STR树,然后对于每个点调用最近邻方法来获取距离每个点最近的线。这样做的一个缺点是所有数据可能都需要装入驱动程序的内存中。这是使用 shapely python 库,因此您可能需要以所有正常方式将其导入到您的存储库中。
顺便说一句:
JoinQuery.SpatialJoinQuery(points, roads, True, False)
将始终返回 false。最后一个参数必须是谓词 - 请参阅 https://github.com/apache/sedona/blob/master/docs/tutorial/rdd.md#write-a-spatial-join-query
您可以在代码存储库中执行此操作,但需要注意的是,没有方便的方法来利用分布式处理,因为 Apache Sedona 的 KNN 一次仅适用于一个点,而其他解决方案使用内存中工具(例如 geopandas)。 2021 年有一篇关于在 Sedona 实现 KNN 连接查询的论文,该论文将以 Spark 作业的正常方式进行分发,但我无法找到该论文的任何源代码或更新。
如果未来的读者更熟悉 geopandas 或其他一些地理空间包,那么这对他们来说可能是比下面的代码更好的方法。我的解决方案无法扩展到海量数据(但许多形状文件实际上在磁盘上并没有那么大)。我在 TIGER 的 Illinois Sinkholes shapefile 和 Illinois Roads shapefile 上测试了此代码。
有关此代码的注释:
from transforms.api import transform, Input, Output
from geospatial_tools import geospatial
from sedona.register.geo_registrator import SedonaRegistrator
from sedona.utils.adapter import Adapter
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.core.spatialOperator import KNNQuery
from sedona.core.enums import IndexType
from shapely import wkt
import logging
from pyspark.sql import Row
logger = logging.getLogger(__name__)
@geospatial()
@transform(
output_df=Output("<output_path>"),
points=Input("<input_points_shapefile_dataset>"),
roads=Input("<input_lines_shapefile_dataset>"),
)
def compute(ctx, points, roads, output_df):
SedonaRegistrator.registerAll(ctx.spark_session)
roads_rdd = ShapefileReader.readToGeometryRDD(
ctx.spark_session.sparkContext, roads.filesystem().hadoop_path
)
roads_rdd.analyze()
points_rdd = ShapefileReader.readToGeometryRDD(
ctx.spark_session.sparkContext, points.filesystem().hadoop_path
)
points_rdd.analyze()
roads_rdd.buildIndex(IndexType.RTREE, False)
points_df = Adapter.toDf(points_rdd, ctx.spark_session)
k = 1
using_index = True
points_list = points_df.collect() # noqa
nearest_roads = []
for point in points_list:
try:
nearest_road = (
KNNQuery.SpatialKnnQuery(
roads_rdd, point.asDict()["geometry"], k, using_index
)
.pop()
.getUserData()
)
except Exception as e:
logger.warn(e)
nearest_road = None
p_dict = point.asDict()
p_dict["nearest_road"] = nearest_road
p_dict["geometry"] = wkt.dumps(p_dict["geometry"])
nearest_roads.append(Row(**p_dict))
points_nearest_df = ctx.spark_session.createDataFrame(nearest_roads)
output_df.write_dataframe(points_nearest_df)