Pyspark:从表中读取数据并写入文件

问题描述 投票:-1回答:1

我正在使用HDInsight Spark集群运行我的Pyspark代码。我正在尝试从postgres表中读取数据并写入如下文件。pgsql_df返回DataFrameReader而不是DataFrame。所以我无法将DataFrame写入文件。为什么“ spark.read”返回DataFrameReader。我在这里想念什么?

from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as dbpull
from datetime import datetime
from pyspark.sql.types import Row
from pyspark.sql import DataFrame
from pyspark.sql import DataFrameReader
from pyspark.sql import DataFrameWriter
import random
import string
from pyspark.sql.functions import *
import sys
spark=SparkSession.builder.master("local").appName("db pull").getOrCreate()
pgsql_df=spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://<hostdetails>") \
    .option("dbtable", "table") \
    .option("user", "user") \
    .option("password", "password")```

>>>pgsql_df
<pyspark.sql.readwriter.DataFrameReader object at 0x7fb43ce1f890>


pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)


**Error:** 
 Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'DataFrameReader' object has no attribute 'write'





python apache-spark pyspark hdinsight
1个回答
0
投票

请检查以下代码。您缺少在DataFrameReader对象上调用load()的方法。

pgsql_df=spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://<hostdetails>") \
    .option("dbtable", "table") \
    .option("user", "user") \
    .option("password", "password")
    .load() // this is missing 

pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)

or 


pgsql_df=spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://<hostdetails>") \
    .option("dbtable", "table") \
    .option("user", "user") \
    .option("password", "password")

pgsql_df
.load() \ added here 
.write. \
.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)

© www.soinside.com 2019 - 2024. All rights reserved.