如何使用 Apache Sedona 将 parquet 格式数据框中的纬度和经度列转换为点类型(几何)?

问题描述 投票:0回答:1

我有 100 TB 的 parquet 格式的数据。该数据有很多列,包括纬度和经度列。我想使用 Apache Sedona 将这些列转换为点类型列(几何),这样我就能够执行一些其他功能,例如查找多边形中的点。我正在使用带有 pyspark 3.2 的 databricks 笔记本。

有一些使用 SQL 和 Scala 的方法,但我在 Apache Sedona 文档中找不到任何关于如何在 pyspark 中执行此操作的 API。谁能帮我怎么做?

我读取数据的方式是这样的:


rawDf = sedona.read.format("parquet").load("s3://PATH_TO_MY_PARQUET_DATA")
rawDf.createOrReplaceTempView("rawdf")

我尝试了以下代码,但它给了我错误 st_makePoint is not Defined:

df = rawDf.withColumn("geometry", st_makePoint(col("longitude"), col("latitude")))

任何帮助将不胜感激。

geometry bigdata parquet spatial apache-sedona
1个回答
0
投票

最后,经过大量的尝试、错误和搜索,我找到了我的问题的答案。我们可以将点列添加到数据中,如下所示:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
SedonaContext.create(spark)

df = spark.read.parquet("PATH_TO_YOUR_DATA")
df.createOrReplaceTempView("df")
df_with_geom = spark.sql("Select *, ST_Point(longitude, latitude) AS geometry from df")
df_with_geom.createOrReplaceTempView("df_with_geom")

运行此代码后,df_with_geom 将有一个附加列“geometry”,其中包含点,并且它是几何类型。

如果我们假设这些是美国境内的一些点,并且我们想找到哪些点位于哪些州内。这是一个空间连接查询,可以通过以下步骤完成:

首先我们需要为各州提供多边形:

#州界多边形数据来源:

Ubuntu wget https://raw.githubusercontent.com/DataOceanLab/CPTS-415-Project-Examples/main/boundary-each-state.tsv

states_wkt = spark.read.option("delimiter", "\t").option("header", "false").csv("PATH/boundary-each-state.tsv").toDF("state_name","state_bound")
states_wkt.show()
states_wkt.printSchema()


states = states_wkt.selectExpr("state_name", "ST_GeomFromWKT(state_bound) as state_bound")
states.show()
states.printSchema()
states.createOrReplaceTempView("states")


+-------------+--------------------+
|   state_name|         state_bound|
+-------------+--------------------+
|       Alaska|POLYGON((-141.020...|
|      Alabama|POLYGON((-88.1955...|
|     Arkansas|POLYGON((-94.0416...|
|      Arizona|POLYGON((-112.598...|
|   California|POLYGON((-124.400...|

+-------------+--------------------+
only showing top 20 rows

root
|-- state_name: string (nullable = true)
|-- state_bound: string (nullable = true)

+-------------+--------------------+
|   state_name|         state_bound|
+-------------+--------------------+
|       Alaska|POLYGON ((-141.02...|
|      Alabama|POLYGON ((-88.195...|
|     Arkansas|POLYGON ((-94.041...|
|      Arizona|POLYGON ((-112.59...|
|   California|POLYGON ((-124.40...| 
+-------------+--------------------+
only showing top 20 rows

root
|-- state_name: string (nullable = true)
|-- state_bound: geometry (nullable = true)

最后通过运行以下代码,我们可以将状态列添加到每个点的数据中:

result = spark.sql("SELECT * FROM states s, df_with_geom a WHERE ST_Contains(s.state_bound, a.geometry)")

 
© www.soinside.com 2019 - 2024. All rights reserved.