如何使用Python跳过n行二进制stdin?

问题描述 投票:0回答:1

我正在使用Hadoop CLI将二进制数据传输到Hadoop集群上的Python脚本。二进制数据具有标识新文档开始位置的终止符。记录按唯一标识符排序,标识符从1000000001开始并递增1。

我试图仅为字典中的这些ID的子集保存数据。

我目前的流程是使用以下方法从CLI中选择数据:

hadoop select "Database" "Collection" | cut -d$'\t' -f2 | python script.py

并在script.py中处理它,如下所示:

import json
import sys

member_mapping = json.load(open('member_mapping.json'))

output = []

for line in sys.stdin:
    person = json.loads(line)
    if member_mapping.get(person['personId']):
        output.append({person['personId']: person})
    if len(output) == len(member_mapping):
        break

问题是这个二进制数据中有6.5M ID,扫描需要将近2个小时。我知道我的字典中的min()和max()ID,你可以在我的代码中看到我在保存n个文件时提前停止,其中n是我的映射文件的长度。

我希望通过尽可能多的读取来提高此过程的效率。如果ID从1000000001开始,我要保存的第一个ID是1000010001,我可以简单地跳过10,000行吗?

由于系统问题,我目前无法使用spark或任何其他可能改进此过程的工具,因此我需要坚持使用Python和Hadoop CLI的解决方案。

python hadoop optimization binaryfiles
1个回答
0
投票

您可以尝试使用enumerate和阈值,然后跳过任何不在您关注的rane中的输入。这不是一个直接的解决方案,但应该运行得更快,并将相同的前10,000行扔掉。

for lineNum, line in enumerate(sys.stdin):
    if(lineNum < 10000):
         continue
    person = json.loads(line)
    if member_mapping.get(person['personId']):
        output.append({person['personId']: person})
    if len(output) == len(member_mapping):
        break
© www.soinside.com 2019 - 2024. All rights reserved.