PySpark将经纬度转换为UTM坐标-python worker无法重新连接

问题描述 投票:0回答:1

我有一个纬度/经度坐标的json文件,我尝试在PySpark中将其转换为UTM(“ x”,“ y”)。.json文件如下所示:

{"positionmessage":{"latitude": 51.822872161865234,"longitude": 4.905614852905273}}
{"positionmessage":{"latitude": 51.819644927978516, "longitude": 4.961687088012695}}

我在pyspark中读取了json文件,并尝试使用以下脚本在PySpark中转换为UTM('x','y'-coord):

import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, DateType, FloatType, TimestampType, DoubleType
from pyspark.sql.functions import *

appName = "PySpark"
master = "local"
file_name = "lat_lon.JSON"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

schema = StructType([
    StructField("positionmessage",
    StructType([
    StructField('latitude', DoubleType(), True),
    StructField('longitude', DoubleType(), True),
    ]))])

df  = spark.read.schema(schema).json(file_name).select("positionmessage.*")

直到这里都没问题;当我尝试使用pyproj包(在Pandas中可用)转换为UTM坐标时,会出现问题。

from pyspark.sql.functions import array, pandas_udf, PandasUDFType
from pyproj import Proj
from pandas import Series

# using decorator 'pandas_udf' to wrap the function. 
@pandas_udf('array<double>', PandasUDFType.SCALAR) 
def get_utm(x):
  pp = Proj(proj='utm',zone=31,ellps='WGS84', preserve_units=False)
  return Series([ pp(e[0], e[1]) for e in x ])

df = df.withColumn('utm', get_utm(array('longitude','latitude'))) \
  .selectExpr("*", "utm[0] as X", "utm[1] as Y")

df.show()

我遇到问题:“ python worker无法重新连接”,但是代码本身似乎没有问题。可能是什么问题?

pyspark converters pyspark-dataframes
1个回答
0
投票

您可以使用普通的UDF而不是熊猫UDF:

@udf(returnType=ArrayType(DoubleType()))
def get_utm(long, lat):
  pp = Proj(proj='utm', zone=31, ellps='WGS84', preserve_units=False)
  return pp(long, lat)


result = df.withColumn('utm', get_utm('longitude','latitude')).selectExpr("*", "utm[0] as X", "utm[1] as Y")
© www.soinside.com 2019 - 2024. All rights reserved.