处理管道分隔文件中的换行符

问题描述 投票:0回答:1

我们有一个 50GB 的文本文件,用竖线分隔(|),如下

第 1 列|第 2 列|列
umn3|第 4 列|第 5 列

值1|值2|值
3|值4|值5

所以挑战在于将其读入 pyspark 数据帧。由于每个列/值都会在固定宽度后进入下一行。如果每一列都用引号引起来,我可以告诉 Spark 使用多行并将其中的值视为一个,即使存在换行符也是如此。 这种情况该如何处理呢?我应该在加载到 Spark 之前预处理此文件吗?

期望有一个解决方案来读取此文件

python linux pyspark
1个回答
0
投票

您可以使用

dask
来实现这个预处理任务。以下代码将以 500MB 为块处理 50GB 文件,并将输出写入 5 个分区。一切都是延迟/惰性操作,就像在 Spark 中一样。让我知道事情的后续。您可能必须从数据中删除标题行,然后在 Spark 数据框中提供标题。

将 dask 安装为

pip install dask[complete]

import dask.bag as db
from dask.distributed import Client
from dask import delayed


def preprocess_line(line):
    processed_line = '|'.join([f'"{field}"' for field in line.split('|')])
    return processed_line


if __name__ == '__main__':

    input_file = "../data/pipedelimited.csv"

    client = Client(n_workers=8, threads_per_worker=4)
    b = db.read_text(input_file, blocksize="500MB")  # blocksize=None for streaming, read more on the options here
    line_count = b.count()
    line_count_computed = line_count.compute()
    print(f"count of lines in whole file = {line_count_computed}")

    delayed_partitions = b.to_delayed()

    first_flag = True
    first_line = None
    second_line = None
    processed_lines = []


    for delayed_partition in delayed_partitions:
        partition = delayed_partition.compute()
        lines = iter(partition)
        print(f"first line = {lines}")

        try:
            while True:
                if first_flag:
                    first_line = next(lines)
                    first_flag = False
                    continue
                else:
                    second_line = next(lines)
                    final_line = first_line + second_line.strip()
                    processedline = preprocess_line(final_line)
                    processed_lines.append(processedline)
                    #print(processedline)
                    first_flag = True

        except StopIteration:
            print("Reached the end of the list.")


        processed_bag = db.from_sequence(processed_lines, npartitions=5)

        output_path = "../dask_output/processed_corrected.csv"
        processed_bag.to_textfiles(output_path)

输出如下:

count of lines in whole file = 2592
first line = <list_iterator object at 0x724570f54ac0>
Reached the end of the list.
© www.soinside.com 2019 - 2024. All rights reserved.