我正在使用 hashlib 库(hashlib.md5() 函数),它在 pySpark 中的小文件大小下工作正常。问题是 - 我有大约 2 TB 的巨大文件,我需要计算该文件的 md5 哈希值。 pyspark 有没有办法利用分布式方法来实现这一目标?我当前的方法如下所示。
def md5_cal(row):
file_contents = row[1]
md5_hash = hashlib.md5()
md5_hash.update(file_contents.encode('utf-8'))
return md5_hash.hexdigest()
filepath = "my/HDFS/path"
md5_value = (sc.wholeTextFiles(filepath)).map(md5_cal).collect()
print("MD5 Hash:", md5_value )
可以帮助您实现这一目标的工作流程。
由于这是一个大小为 2 TB 的大文件,因此您需要首先将其拆分为较小的块,例如 1GB。
分裂的原因是这样的:
您遇到的错误是因为二进制文件的大小超出了 Spark 中允许的最大长度,即 2147483647 字节或大约 2GB。您尝试读取的文件约为 7GB,远远超出了此限制。
要高效地进行拆分,您可以使用 aws cli 的较低级别 s3api。
首先,您需要知道文件的大小,以便确定每个块的字节范围。您可以使用 head-object 命令获取文件的元数据,其中包括其大小。
aws s3api head-object --bucket your-bucket-name --key path/to/your/large-file
一旦知道文件的大小,您就可以开始分块下载。您可以通过使用 --range 选项在 get-object 命令中指定字节范围来完成此操作。范围以 bytes=startByte-endByte 格式指定。
例如,要下载文件的前 1 MB(1,048,576 字节),您可以使用范围 bytes=0-1048575。
aws s3api get-object --bucket your-bucket-name --key path/to/your/large-file --range bytes=0-1048575 chunk_000001.bin
将所有这些块放在一个 s3 键下。这些块的文件名应该像
[chunk_000001.bin, chunk_000002.bin ..... so on till ... chunk_010000.bin]
您可以在 shell 脚本中自动执行上述过程,并在与 s3 文件位于同一区域的 EC2 计算机上运行此脚本。
在 Spark 中执行以下步骤:
在具有大量内存的大型集群上并行读取所有这些块。
使用map函数计算这些块的md5sum到
md5sum_column
。
然后选择
[filename, md5sum_column]
列并按文件名列排序。这是为了确保 md5sum 顺序与块文件名按顺序匹配。
将我们在
md5sum_column
列中计算出的所有 md5sum 收集到一个列表中。
连接字符串以创建更大的字符串。
然后找到该字符串的哈希值。这将为您提供 2TB 大文件的最终哈希值。