我有一个像这样的交易表::
transactions.show()
+---------+-----------------------+--------------------+
|person_id|collect_set(title_name)| prediction|
+---------+-----------------------+--------------------+
| 3513736| [Make or Break, S...|[Love In Island.....|
| 3516443| [The Blacklist]|[Moordvrouw, The ...|
| 3537643| [S4 - Dutch progr...|[Vamos met de Fam...|
| 3547688| [Phileine Zegt So...| []|
| 3549345| [The Wolf of Wall...| []|
| 3550565| [Achtste Groepers...| []|
| 3553669| [Mega Mindy: Reis...| []|
| 3558162| [Snitch, Philomen...| []|
| 3561387| [Automata, The Hi...|[Bella Donna's, M...|
| 3570126| [The Wolf of Wall...| []|
| 3576602| [Harry & Meghan: ...|[Weg van Jou, Moo...|
| 3586366| [Gooische Vrouwen...|[Familieweekend, ...|
| 3586560| [Hooligans 3: Nev...| []|
| 3590208| [S2 - Dutch drama...|[Love In Island.....|
+---------+-----------------------+——————————----------+
表格的结构如下
transactions.printSchema()
root
|-- person_id: long (nullable = false)
|-- collect_set(title_name): array (nullable = true)
| |-- element: string (containsNull = true)
|-- prediction: array (nullable = true)
| |-- element: string (containsNull = true)
现在,我想写这个表来
csv
保留每列的内容。尝试过以下
transactions.repartition(1)\
.write.mode('overwrite')\
.save(path="//Users/King/Documents/my_final.csv", format='csv',sep=',',header = 'true')
但是,我收到以下错误。
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-66-7473346bdbb1> in <module>()
----> 1 vl_assoc_rules_pred.repartition(1).write.mode('overwrite').save(path="s3a://ci-data-apps/rashid/vl-assoc-rules/vl_assoc_rules_pred.csv", format='csv',sep=',',header = 'true')
/usr/lib/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
593 self._jwrite.save()
594 else:
--> 595 self._jwrite.save(path)
596
597 @since(1.4)
/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o840.save.
有人可以告诉我如何编写此表以保持每列内容完整吗?
提前致谢!
csv
panda_df = df.toPandas()
df.to_csv()
将其另存为 CSV。
可以使用spark-csv:
火花1.3
transactions.to_csv(file_name, sep=',')
火花1.4+
df.save('mycsv.csv', 'com.databricks.spark.csv')
在 Spark 2.0+ 中,您可以直接使用 csv 数据源:
df.write.format('com.databricks.spark.csv').save('mycsv.csv')