PySpark DataFrame (pyspark.sql.dataframe.DataFrame) 到 CSV

问题描述 投票:0回答:2

我有一个像这样的交易表::

transactions.show()

+---------+-----------------------+--------------------+
|person_id|collect_set(title_name)|          prediction|
+---------+-----------------------+--------------------+
|  3513736|   [Make or Break, S...|[Love In Island.....|
|  3516443|        [The Blacklist]|[Moordvrouw, The ...|
|  3537643|   [S4 - Dutch progr...|[Vamos met de Fam...|
|  3547688|   [Phileine Zegt So...|                  []|
|  3549345|   [The Wolf of Wall...|                  []|
|  3550565|   [Achtste Groepers...|                  []|
|  3553669|   [Mega Mindy: Reis...|                  []|
|  3558162|   [Snitch, Philomen...|                  []|
|  3561387|   [Automata, The Hi...|[Bella Donna's, M...|
|  3570126|   [The Wolf of Wall...|                  []|
|  3576602|   [Harry & Meghan: ...|[Weg van Jou, Moo...|
|  3586366|   [Gooische Vrouwen...|[Familieweekend, ...|
|  3586560|   [Hooligans 3: Nev...|                  []|
|  3590208|   [S2 - Dutch drama...|[Love In Island.....|
+---------+-----------------------+——————————----------+

表格的结构如下

transactions.printSchema()

root
 |-- person_id: long (nullable = false)
 |-- collect_set(title_name): array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- prediction: array (nullable = true)
 |    |-- element: string (containsNull = true)

现在,我想写这个表来

csv
保留每列的内容。尝试过以下

transactions.repartition(1)\
.write.mode('overwrite')\
.save(path="//Users/King/Documents/my_final.csv", format='csv',sep=',',header = 'true')

但是,我收到以下错误。

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-66-7473346bdbb1> in <module>()
----> 1 vl_assoc_rules_pred.repartition(1).write.mode('overwrite').save(path="s3a://ci-data-apps/rashid/vl-assoc-rules/vl_assoc_rules_pred.csv", format='csv',sep=',',header = 'true')

/usr/lib/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
    593             self._jwrite.save()
    594         else:
--> 595             self._jwrite.save(path)
    596 
    597     @since(1.4)

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o840.save.
有人可以告诉我如何编写此表以保持每列内容完整吗?
提前致谢!

dataframe pyspark export-to-csv apache-spark-sql
2个回答
0
投票

csv



-1
投票

panda_df = df.toPandas() df.to_csv()

将其另存为 CSV。 

可以使用spark-csv:

火花1.3

transactions.to_csv(file_name, sep=',')

火花1.4+

df.save('mycsv.csv', 'com.databricks.spark.csv')

在 Spark 2.0+ 中,您可以直接使用 csv 数据源:

df.write.format('com.databricks.spark.csv').save('mycsv.csv')

© www.soinside.com 2019 - 2024. All rights reserved.