dask - 在一个超过RAM的大型数据帧上应用一个函数。

问题描述 投票:0回答:1

相信Dask框架能够处理超过RAM大小的数据集。然而,我无法成功地应用它来解决我的问题,听起来是这样的。

我有一个巨大的.csv文件(1.8Gb),其中包含了用户评论的文本,内存为8Gb。我的目标是对给定的数据进行预处理(首先是对句子进行标记化)。为了实现这个目标,我运行了以下代码。

if __name__ == '__main__':

client = Client(n_workers=3, memory_limit='1.5GB', processes=True)
df = dd.read_csv('texts_no_n', dtype={'user_id': int, 'post_id': int, 'text': str})

print('Tokenizing sents')
def tokenize(df):
    df['text'] = df.text.apply(lambda post: nltk.sent_tokenize(post, language='russian'))
    print('tokenized')
    return df

df = df.map_partitions(tokenize, meta=df)
df.compute()

Dask将我的数据框架分割成20个分区。

我希望Dask工作者对每个分区进行迭代。

  1. 对文本进行标记化 (以运行 tokenize(df_part)),并返回给定数据帧的新的预处理部分。
  2. 释放内存,用于从文件中读取分区。在执行 "compute "方法后,它总是这样做。

在迭代完所有的分区后,我希望Dask能将所有的预处理分区连接起来,并返回一个完整的预处理数据帧。

这种行为在我看来是符合逻辑的,也是最节省内存的,尽管实践表明,在处理完整个数据帧之前,Dask并没有释放内存。

在计算了12个20分区后,我的内存用完了,Dask似乎想把工人的数据转储到磁盘上。看一下输出。

Tokenizing sents
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
tokenized
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory?  Process memory: 1.05 GB -- Worker memory limit: 1.50 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 1.05 GB -- Worker memory limit: 1.50 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 979.51 MB -- Worker memory limit: 1.50 GB
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker

由于内存泄漏,调度器重新启动了所有的工人。大量的RAM释放,tokenizing的过程又重新开始了(发生在图片中RAM急剧下降的时候)。

enter image description here

我想,当工人重启时,他们会从头开始工作,否则我的数据预处理最终会结束。因此,重启工人并不符合我的需求。

同一个进程运行多次后,调度器会杀死工人,代码终止。

我的问题是

1)有没有可能用Dask或其他工具对大数据进行多处理预处理?

我可以用pandas的数据框架来管理这个1.8Gb的数据集,只用一个进程,但我是出于教育的目的。如果我的数据集超过了我的内存怎么办?比如说,让它成为10Gb。

2)为什么Dask的工作者不能将他们计算出的每个分区的数据转储到磁盘上,以释放RAM?

输出显示工人没有数据可以存储,但事实并非如此,因为我的RAM中充满了数据。如果一个分区大小约为60Mb(就像我的情况一样),Dask难道不能直接转储一些分区吗?

还有一点需要思考的是下面的问题。

考虑3个工人的情况。如果每个worker处理的数据量大致相同,那么对于我的1.8Gb的情况来说,一个进程使用的最大内存量应该是等于以下几点

1) 1.8Gb / 3 * 2 = 1.2Gb,而所需的是。2) 1.8Gb / 3 = 600Mb

在第一种情况下,我将结果乘以2,假设数据花费的是 df = df.map_partitions(tokenize, meta=df) 等于给定的数据量加上处理过的数据量(在我的情况下,大致相同)。数据消耗的第二个公式是我想要的技术,上面概述(我希望Dask工作的方式)。

问题是,我并不具备如此巨大的RAM来容纳数据,由第一个公式消耗。

pyspark data-science python-multiprocessing dask distributed-computing
1个回答
0
投票

Dask的功能和你所期望的一样。 它加载了一大块数据,处理它,然后如果它可以释放它。 然而,你可能会遇到一些问题。

  1. 你在调用 df.compute 这意味着您要求 Dask 将整个数据集返回为一个单一的内存 pandas 数据框架。 相反,您可能想尝试一些类似 df.to_parquet(...) 以便Dask知道您实际上是想把结果写到磁盘或其他聚合中,这样您的输出确实适合内存。

  2. Dask会一次并行运行您的许多任务,因此它会一次加载许多块。

你可能想在这里查看Dask的最佳实践。https:/docs.dask.orgenlatestbest-practices.html


0
投票

最后,我能够回答我自己的问题。

正如实践(和文档)所显示的那样,处理dask的最好方法--是用.parquet格式来使用它。一开始,我把我的大文件分割成许多.parquet文件,并以 df.to_parquet(dir_name)然后把它们装回 dd.read_parquet(dir_name) 并应用了我的函数。

下面的代码对我来说是有效的。

def preprocess_df(df): # To pass to 'map_partition'

    mystem = Mystem()  # Most important to set it here! Don't pass objects as an argument  
    df['text'] = df['text'].apply(lambda x: pr.preprocess_post(x, mystem))

    mystem.close()
    return df

if __name__ == '__main__':
    client = Client(n_workers=4)

    # Splitting the big file
    df = dd.read_csv('texts.csv', dtype={'user_id': int, 'post_id': int, 'text': str}) # Read a big data file
    df = df.repartition(npartitions=df.npartitions*8) # 8 migh be too high, try with lower values at first (e.g., 2 or don't repartition at all)
    df.to_parquet(dir_name) # convert .csv file to .parquet parts

    # Loading the splitted file parts
    df = dd.read_parquet(dir_name)

    # Applying the function 
    df = df.map_partitions(preprocess_df, meta={'user_id': int, 'post_id': int, 'text': object}) # Be sure not to '.compute' here

    df.to_parquet('preprocesed.parquet')
    client.close()

RAM消耗没有超过50%.

我想,不是.parquet格式帮助降低了RAM消耗,而是把文件分成了几个部分。

更新:在传递对象时要小心(mystem)到函数(preprocess_df),因为它可能会导致意外的行为(因为所有的工作者都会尝试塑造这个对象,这在大多数情况下不是我们想要的)。如果你需要传递额外的 "多处理问题 "对象,请在函数中定义它们(如第三行。mystem = Mystem()).

© www.soinside.com 2019 - 2024. All rights reserved.