如何找到距离地理空间点最近的地理空间线

问题描述 投票:0回答:2

背景

对于伊利诺伊州的所有道路,我有超过 500,000 个道路对象,这些道路具有线的 Geoshape 属性。我另外还有一组针对全州各点的对象。

需要

我想向点对象类型的支持数据集中添加一列,用于表示距每个点最近的道路的 ID。大多数道路都在点的 50m 范围内,因此这一事实可以帮助优化所选择的任何方法。

之前的尝试

我尝试使用 Palantir 原生地理空间工具库的 DataFrame.knn_join() 方法部分。然而,测试表明它显然无法找到最接近点的线。只有找到距离某个点最近的点才有效。也需要很长时间。

我还尝试执行 DataFrame.distance_join() 但是它返回一定距离内的所有对象,而我只想要最近的对象。我想我可以得到这些点 50 m 以内的所有道路,然后计算每个结果与该点之间的距离并找到最小值,但这似乎有点矫枉过正,它将排除超过 50 m 以外的道路。

我最终想到使用另一个库而不是地理空间工具来完成我想要的事情。我询问 ChatGPT 如何做到这一点,它提出了使用 GeoSpark 的部分代码:

from transforms.api import transform_df, Input, Output
from geospatial_tools import geospatial
from geospark.register import GeoSparkRegistrator
from geospark.core.spatialOperator import JoinQuery

@geospatial()
@transform_df(
    Output("ri.foundry.main.dataset.46a58ef8-732f-4bad-9b19-8e3aab9f5d30"),
    roads=Input("ri.foundry.main.dataset.32ea817c-1f13-4295-b0a1-345ca38e64d2"),
    points=Input("ri.foundry.main.dataset.e0530819-d744-49ac-9e39-91bacd41d199")
)
def compute(ctx, roads, points):

    GeoSparkRegistrator.registerAll(ctx.spark_session)

    joined_df = JoinQuery.SpatialJoinQuery(points, roads, True, False)

    return joined_df

但是,当我运行此命令时,我收到此错误:

Java classpath reference error

A Python dependency you are using is attempting to reference a Java jar not in the classpath. Please check recently added Python dependencies, and add a dependency on the necessary Java packages (JARs) in the build.gradle file.
/transforms-python/src/myproject/datasets/nearest-road.py
    GeoSparkRegistrator.registerAll(ctx.spark_session)

我不知道如何解决这个问题。

让我知道另一个解决方案或如何修复此代码!

pyspark geospatial palantir-foundry geospark apache-sedona
2个回答
0
投票

要回答您的问题,您可能需要编写一些自定义内容来实现您的用例。您的数据规模是多少?您可以在本地编写一些内容而不会遇到内存问题,还是需要分布式计算?

基本上你想在线上创建一个STR树,然后对于每个点调用最近邻方法来获取距离每个点最近的线。这样做的一个缺点是所有数据可能都需要装入驱动程序的内存中。这是使用 shapely python 库,因此您可能需要以所有正常方式将其导入到您的存储库中。

顺便说一句:

JoinQuery.SpatialJoinQuery(points, roads, True, False)
将始终返回 false。最后一个参数必须是谓词 - 请参阅 https://github.com/apache/sedona/blob/master/docs/tutorial/rdd.md#write-a-spatial-join-query


0
投票

您可以在代码存储库中执行此操作,但需要注意的是,没有方便的方法来利用分布式处理,因为 Apache Sedona 的 KNN 一次仅适用于一个点,而其他解决方案使用内存中工具(例如 geopandas)。 2021 年有一篇关于在 Sedona 实现 KNN 连接查询的论文,该论文将以 Spark 作业的正常方式进行分发,但我无法找到该论文的任何源代码或更新。

如果未来的读者更熟悉 geopandas 或其他一些地理空间包,那么这对他们来说可能是比下面的代码更好的方法。我的解决方案无法扩展到海量数据(但许多形状文件实际上在磁盘上并没有那么大)。我在 TIGER 的 Illinois Sinkholes shapefileIllinois Roads shapefile 上测试了此代码。

有关此代码的注释:

  • 它假设您将所有相关的 shapefile 文件上传到数据集(使用数据集作为 .shp、.cpg 等文件的文件夹)。一组用于点,一组用于线。
  • 它硬编码了您需要更改的列名称假设
  • 它应用了 Spark 反模式,通过收集的数据帧进行循环
  • 您关心的输出列(其中包含道路信息的列)是道路数据集中该行的所有列的制表符分隔字符串;它在输出数据集中被命名为“nearest_road”。

from transforms.api import transform, Input, Output
from geospatial_tools import geospatial
from sedona.register.geo_registrator import SedonaRegistrator
from sedona.utils.adapter import Adapter
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.core.spatialOperator import KNNQuery
from sedona.core.enums import IndexType
from shapely import wkt
import logging
from pyspark.sql import Row


logger = logging.getLogger(__name__)


@geospatial()
@transform(
    output_df=Output("<output_path>"),
    points=Input("<input_points_shapefile_dataset>"),
    roads=Input("<input_lines_shapefile_dataset>"),
)
def compute(ctx, points, roads, output_df):
    SedonaRegistrator.registerAll(ctx.spark_session)
    roads_rdd = ShapefileReader.readToGeometryRDD(
        ctx.spark_session.sparkContext, roads.filesystem().hadoop_path
    )
    roads_rdd.analyze()
    points_rdd = ShapefileReader.readToGeometryRDD(
        ctx.spark_session.sparkContext, points.filesystem().hadoop_path
    )
    points_rdd.analyze()
    roads_rdd.buildIndex(IndexType.RTREE, False)
    points_df = Adapter.toDf(points_rdd, ctx.spark_session)
    k = 1
    using_index = True
    points_list = points_df.collect()  # noqa
    nearest_roads = []
    for point in points_list:
        try:
            nearest_road = (
                KNNQuery.SpatialKnnQuery(
                    roads_rdd, point.asDict()["geometry"], k, using_index
                )
                .pop()
                .getUserData()
            )
        except Exception as e:
            logger.warn(e)
            nearest_road = None
        p_dict = point.asDict()
        p_dict["nearest_road"] = nearest_road
        p_dict["geometry"] = wkt.dumps(p_dict["geometry"])
        nearest_roads.append(Row(**p_dict))
    points_nearest_df = ctx.spark_session.createDataFrame(nearest_roads)
    output_df.write_dataframe(points_nearest_df)

© www.soinside.com 2019 - 2024. All rights reserved.