使用 PySpark 从纬度/经度列创建 LineString

问题描述 投票:0回答:3

我有一个 PySpark 数据框,其中包含由“trajectories_id”列标识的不同轨迹的纬度/经度点。

轨迹_id 纬度 经度
1 45 5
1 45 6
1 45 7
2 46 5
2 46 6
2 46 7

我想要做的是为每个轨迹_id 提取一个 LineString 并将其存储在另一个数据框中,其中每一行代表一个带有“id”和“geometry”列的轨迹。在此示例中,输出应为:

轨迹_id 几何
1 线串(5 45, 6 45, 7 45)
2 线串(5 46, 6 46, 7 46)

这与这个问题中提出的问题类似,但就我而言,我需要使用 PySpark。

我尝试过以下方法:

import pandas as pd
from shapely.geometry import Point,LineString
df = pd.DataFrame([[1, 45,5], [1, 45,6], [1, 45,7],[2, 46,5], [2, 46,6], [2, 46,7]], columns=['trajectory_id', 'latitude','longitude'])
df1 = spark.createDataFrame(df)
idx_ = df1.select("trajectory_id").rdd.flatMap(lambda x: x).distinct().collect()
geo_df = pd.DataFrame(index=range(len(idx_)),columns=['geometry','trajectory_id'])
k=0
for i in idx_:
    df2=df1.filter(F.col("trajectory_id").isin(i)).toPandas()
    df2['points']=df2[["longitude", "latitude"]].apply(Point, axis=1)
    geo_df.geometry.iloc[k]=str(LineString(df2['points']))
    geo_df['trajectory_id'].iloc[k]=i
    k=k+1

这段代码可以工作,但是正如在我的任务中我正在处理更多的轨迹(> 200万)一样,这需要很长时间,因为我在每次迭代中都转换为 Pandas。 有没有一种方法可以以更有效的方式获得相同的输出? 如前所述,我知道使用 toPandas() (和/或 Collect() )是我应该避免的事情,尤其是在 for 循环中

pyspark apache-spark-sql shapely
3个回答
1
投票

注意:线串将按照表中的轨迹进行排序。

更改 Drashti 的片段的一些内容,因为它没有完全将点集转换为线串。 Apache Sedona 安装是必要的。

import pyspark.sql.functions as func

long_lat_df = result.withColumn('joined_long_lat', func.concat(func.col("longitude"), func.lit(","), func.col("latitude")));

grouped_df = long_lat_df.groupby('trajectory_id').agg(func.collect_list('joined_long_lat').alias("geometry"))

final_df = grouped_df.withColumn('geometry', func.concat_ws(",", func.col("geometry")))

final_df.createOrReplaceTempView("final_df")

query = """select *, ST_LineStringFromText(final_df.geometry, ',') as linestring from final_df"""

final_df = spark.sql(query)
final_df.show()

0
投票

您可以使用 pyspark SQL 的本机函数来完成此操作。

import pyspark.sql.functions as func

long_lat_df = df.withColumn('joined_long_lat', func.concat(func.col("longitude"), func.lit(" "), func.col("latitude")));

grouped_df = long_lat_df .groupby('trajectory_id').agg(func.collect_list('joined_long_lat').alias("geometry"))

final_df = grouped_df.withColumn('geometry', func.concat_ws(", ", func.col("geometry")));

0
投票

另一种选择是使用 Mosaic 库中的

st_point
st_makeline
st_aswkt
(请参阅 文档)。 这将为您提供众所周知的文本 (WKT) 格式的 LineString(如问题中所要求的)。

import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.types as T
import mosaic

# Create the PySpark DataFrame
pd_df = pd.DataFrame(
    [
        [1,45,5],
        [1,45,6],
        [1,45,7],
        [2,46,5],
        [2,46,6],
        [2,46,7],
    ],
    columns=["trajectory_id", "latitude", "longitude"]
)
spark_df = spark.createDataFrame(pd_df)

# Create LineString column
final_df = (
    spark_df
    .withColumn("longitude", F.col("longitude").cast(T.DoubleType()))
    .withColumn("latitude", F.col("latitude").cast(T.DoubleType()))
    .withColumn("point", mosaic.st_point(F.col("longitude"), F.col("latitude")))
    
    .groupBy("trajectory_id")
    .agg(F.collect_list(F.col("point")).alias("list_of_points"))
    
    .withColumn("linestring_geom", mosaic.st_makeline("list_of_points"))
    .withColumn("linestring", mosaic.st_aswkt("linestring_geom"))
    
    .select("trajectory_id", "linestring")
)

请注意,

st_makeline
不保证点的顺序保持相同。

© www.soinside.com 2019 - 2024. All rights reserved.