我有这个功能
customSchema = StructType([ \
StructField("a", Doubletype(), True), \
StructField("b", Doubletype(), True),
StructField("c", Doubletype(), True),
StructField("d", Doubletype(), True)])
n_1= sc.textFile("/path/*.txt")\
.mapPartitions(lambda partition: csv.reader([line.replace('\0','') for line in partition],delimiter=';', quotechar='"')).filter(lambda line: len(line) > 1 )\
.toDF(customSchema)
这将创建一个 Dataframe,问题是“.mapPartitions”将用作默认类型
样本数据
[['0,01', '344,01', '0,00', '0,00']]
或者只是与
合作n_1= sc.textFile("/path/*.txt")\
.mapPartitions(lambda partition: csv.reader([line.replace('\0','') for line in partition],delimiter=';', quotechar='"')).filter(lambda line: len(line) > 1 )\
首先,需要收集所有元素并使用第二个选项创建一个矩阵(列表的列表)。
n_1= sc.textFile("/path/*.txt")\
.mapPartitions(lambda partition: csv.reader([line.replace('\0','') for line in partition],delimiter=';', quotechar='"')).filter(lambda line: len(line) > 1 )\
matrix = n_1.collect()
一旦我们有了这个,就有必要知道哪种类型的数据进入子列表(在我的例子中是“str”)。
matrix =[[x.replace(',', '.') for x in i] for i in matrix ] # replace ',' for '.' in order to perform the data type convertion
matrix = [[float(str(x)) for x in i] for i in matrix ] #convert every sublist element into float
df = sc.parallelize(matrix).toDF()