我正在使用Hadoop CLI将二进制数据传输到Hadoop集群上的Python脚本。二进制数据具有标识新文档开始位置的终止符。记录按唯一标识符排序,标识符从1000000001开始并递增1。
我试图仅为字典中的这些ID的子集保存数据。
我目前的流程是使用以下方法从CLI中选择数据:
hadoop select "Database" "Collection" | cut -d$'\t' -f2 | python script.py
并在script.py中处理它,如下所示:
import json
import sys
member_mapping = json.load(open('member_mapping.json'))
output = []
for line in sys.stdin:
person = json.loads(line)
if member_mapping.get(person['personId']):
output.append({person['personId']: person})
if len(output) == len(member_mapping):
break
问题是这个二进制数据中有6.5M ID,扫描需要将近2个小时。我知道我的字典中的min()和max()ID,你可以在我的代码中看到我在保存n个文件时提前停止,其中n是我的映射文件的长度。
我希望通过尽可能多的读取来提高此过程的效率。如果ID从1000000001开始,我要保存的第一个ID是1000010001,我可以简单地跳过10,000行吗?
由于系统问题,我目前无法使用spark或任何其他可能改进此过程的工具,因此我需要坚持使用Python和Hadoop CLI的解决方案。
您可以尝试使用enumerate
和阈值,然后跳过任何不在您关注的rane中的输入。这不是一个直接的解决方案,但应该运行得更快,并将相同的前10,000行扔掉。
for lineNum, line in enumerate(sys.stdin):
if(lineNum < 10000):
continue
person = json.loads(line)
if member_mapping.get(person['personId']):
output.append({person['personId']: person})
if len(output) == len(member_mapping):
break