我们有一个 50GB 的文本文件,用竖线分隔(|),如下
第 1 列|第 2 列|列
umn3|第 4 列|第 5 列
值1|值2|值
3|值4|值5
所以挑战在于将其读入 pyspark 数据帧。由于每个列/值都会在固定宽度后进入下一行。如果每一列都用引号引起来,我可以告诉 Spark 使用多行并将其中的值视为一个,即使存在换行符也是如此。 这种情况该如何处理呢?我应该在加载到 Spark 之前预处理此文件吗?
期望有一个解决方案来读取此文件
您可以使用
dask
来实现这个预处理任务。以下代码将以 500MB 为块处理 50GB 文件,并将输出写入 5 个分区。一切都是延迟/惰性操作,就像在 Spark 中一样。让我知道事情的后续。您可能必须从数据中删除标题行,然后在 Spark 数据框中提供标题。
将 dask 安装为
pip install dask[complete]
import dask.bag as db
from dask.distributed import Client
from dask import delayed
def preprocess_line(line):
processed_line = '|'.join([f'"{field}"' for field in line.split('|')])
return processed_line
if __name__ == '__main__':
input_file = "../data/pipedelimited.csv"
client = Client(n_workers=8, threads_per_worker=4)
b = db.read_text(input_file, blocksize="500MB") # blocksize=None for streaming, read more on the options here
line_count = b.count()
line_count_computed = line_count.compute()
print(f"count of lines in whole file = {line_count_computed}")
delayed_partitions = b.to_delayed()
first_flag = True
first_line = None
second_line = None
processed_lines = []
for delayed_partition in delayed_partitions:
partition = delayed_partition.compute()
lines = iter(partition)
print(f"first line = {lines}")
try:
while True:
if first_flag:
first_line = next(lines)
first_flag = False
continue
else:
second_line = next(lines)
final_line = first_line + second_line.strip()
processedline = preprocess_line(final_line)
processed_lines.append(processedline)
#print(processedline)
first_flag = True
except StopIteration:
print("Reached the end of the list.")
processed_bag = db.from_sequence(processed_lines, npartitions=5)
output_path = "../dask_output/processed_corrected.csv"
processed_bag.to_textfiles(output_path)
输出如下:
count of lines in whole file = 2592
first line = <list_iterator object at 0x724570f54ac0>
Reached the end of the list.