关于spark的大多数问题都使用show
作为代码示例而没有生成数据帧的代码,如下所示:
df.show()
+-------+--------+----------+
|USER_ID|location| timestamp|
+-------+--------+----------+
| 1| 1001|1265397099|
| 1| 6022|1275846679|
| 1| 1041|1265368299|
+-------+--------+----------+
如何在我的编程环境中重现此代码而无需手动重写? pyspark在熊猫中有一些相当于read_clipboard
的东西?
缺少将数据导入我的环境的功能是我在Stackoverflow中帮助其他人使用pyspark的一大障碍。
所以我的问题是:
从show
命令将stackoverflow中粘贴的数据复制到我的环境中最方便的方法是什么?
您始终可以使用以下功能:
from pyspark.sql.functions import *
def read_spark_output(file_path):
step1 = spark.read \
.option("header","true") \
.option("inferSchema","true") \
.option("delimiter","|") \
.option("parserLib","UNIVOCITY") \
.option("ignoreLeadingWhiteSpace","true") \
.option("ignoreTrailingWhiteSpace","true") \
.option("comment","+") \
.csv("file://{}".format(file_path))
# select not-null columns
step2 = t.select([c for c in t.columns if not c.startswith("_")])
# deal with 'null' string in column
return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])
这是以下问题中给出的建议之一:How to make good reproducible Apache Spark examples。
注1:有时候,可能会出现一些特殊情况,这可能由于某种原因或其他原因而不适用,并且可能会产生错误/问题,即Group by column "grp" and compress DataFrame - (take last not null value for each column ordering by column "ord")。所以请谨慎使用!
注2 :(免责声明)我不是代码的原作者。感谢@MaxU的代码。我刚刚对它做了一些修改。
迟到的答案,但我经常面临同样的问题所以为这个https://github.com/ollik1/spark-clipboard写了一个小实用程序
它基本上允许复制粘贴数据框显示字符串来激发。要安装它,添加jcenter依赖项com.github.ollik1:spark-clipboard_2.12:0.1
和spark config .config("fs.clipboard.impl", "com.github.ollik1.clipboard.ClipboardFileSystem")
之后,可以直接从系统剪贴板中读取数据帧
val df = spark.read
.format("com.github.ollik1.clipboard")
.load("clipboard:///*")
或者如果您愿意,可以选择文件安装细节和用法在自述文件中描述。
您始终可以将pandas中的数据作为pandas数据帧读取,然后将其转换回spark数据帧。不,与pandas不同,pyspark中没有直接等效的read_clipboard。
原因是Pandas数据帧大多是平面结构,因为spark数据帧可以有结构,数组等复杂结构,因为它有各种各样的数据类型,而且没有出现在控制台输出上,因此不可能重新创建输出中的数据帧。
您可以组合panda read_clipboard,并转换为pyspark数据帧
from pyspark.sql.types import *
pdDF = pd.read_clipboard(sep=',',
index_col=0,
names=['USER_ID',
'location',
'timestamp',
])
mySchema = StructType([ StructField("USER_ID", StringType(), True)\
,StructField("location", LongType(), True)\
,StructField("timestamp", LongType(), True)])
#note: True (implies nullable allowed)
df = spark.createDataFrame(pdDF,schema=mySchema)
更新:
@terry真正想要的是将ASCII代码表复制到python,以下是示例。当您将数据解析为python时,您可以转换为任何内容。
def parse(ascii_table):
header = []
data = []
for line in filter(None, ascii_table.split('\n')):
if '-+-' in line:
continue
if not header:
header = filter(lambda x: x!='|', line.split())
continue
data.append(['']*len(header))
splitted_line = filter(lambda x: x!='|', line.split())
for i in range(len(splitted_line)):
data[-1][i]=splitted_line[i]
return header, data