从大型压缩文件流式传输JSON对象

问题描述 投票:1回答:3

我正在开发一个个人项目,涉及读取JSON对象的大型文件,其中包含可能数百万条目,使用GZip进行压缩。我遇到的问题是确定如何有效地逐行解析这些对象并将它们存储在内存中,这样它们就不会耗尽我系统上的所有RAM。它必须能够在以后访问或构造这些对象以进行分析。我到目前为止所尝试的内容如下

def parse_data(file):
   accounts = []
   with gzip.open(file, mode='rb') as accounts_data:
      for line in accounts_data:
         # if line is not empty
         if len(line,strip()) != 0:
            account = BytesIO(line)
            accounts.append(account)
   return accounts

def getaccounts(accounts, idx):
   account = json.load(accounts[idx])
   # creates account object using fields in account dict
   return account_from_dict(account)

这个实现的一个主要问题是我无法在帐户中访问同一个对象两次而不会导致生成JSONDecodeError。我也不确定这是否是我能做到的最紧凑的方式。

任何援助将不胜感激。

编辑:存储在这些文件中的数据格式如下:

{JSON Object 1}
{JSON Object 2}
...
{JSON Object n}

编辑:我打算使用存储在这些JSON帐户条目中的信息来形成帐户信息中的相似性或模式的图表。

python
3个回答
1
投票

下面是如何随机访问gzip压缩文件中的JSON对象,首先将其解压缩到一个临时文件中,然后使用tell()seek()通过索引检索它们 - 因此只需要足够的内存来保存每个对象的偏移量。

我发布这个主要是因为你问我在评论中这样做的一个例子......我不会这样做,因为它与streaming data不完全相同。主要区别在于,与此不同,它可以访问所有数据,包括能够随意随机访问任何对象。

首先解压缩整个文件会引入一些额外的开销,因此除非您需要能够多次访问JSON对象,否则可能不值得。显示的实现可能通过缓存以前加载的对象来加速,但不知道访问模式将是什么,很难肯定。

import collections.abc
import gzip
import json
import random
import tempfile


class GZ_JSON_Array(collections.abc.Sequence):
    """ Allows objects in gzipped file of JSON objects, one-per-line, to be
        treated as an immutable sequence of JSON objects.
    """
    def __init__(self, gzip_filename):
        self.tmpfile = tempfile.TemporaryFile('w+b')
        # Decompress a gzip file into a temp file and save offsets of the
        # start of each line in it.
        self.offsets = []
        with gzip.open(gzip_filename, mode='rb') as gzip_file:
            for line in gzip_file:
                line = line.rstrip().decode('utf-8')
                if line:
                    self.offsets.append(self.tmpfile.tell())
                    self.tmpfile.write(bytes(line + '\n', encoding='utf-8'))

    def __len__(self):
        return len(self.offsets)

    def __iter__(self):
        for index in range(len(self)):
            yield self[index]

    def __getitem__(self, index):
        """ Return a JSON object at offsets[index] in the given open file. """
        if index not in range(len(self.offsets)):
            raise IndexError
        self.tmpfile.seek(self.offsets[index])
        try:
            size = self.offsets[index+1] - self.offsets[index]  # Difference with next.
        except IndexError:
            size = -1  # Last one - read all remaining data.
        return json.loads(self.tmpfile.read(size).decode())

    def __del__(self):
        try:
            self.tmpfile.close()  # Allow it to auto-delete.
        except Exception:
            pass


if __name__ == '__main__':

    gzip_filename = 'json_objects.dat.gz'

    json_array = GZ_JSON_Array(gzip_filename)

    # Randomly access some objects in the JSON array.
    for index in random.sample(range(len(json_array)), 3):
        obj = json_array[index]
        print('object[{}]: {!r}'.format(index, obj))

0
投票

Hhi,也许使用增量json阅读器,如ijson。这不需要立即将整个结构加载到内存中。


0
投票

根据您在评论中的答案,您似乎只需要扫描对象:

def evaluate_accounts(file):
    results = {}

    with gzip.open(file) as records:
        for json_rec in records:
            if json_rec.strip():
                account = json.loads(json_rec)
                results[account['id']] = evaluate_account(account)

    return results
© www.soinside.com 2019 - 2024. All rights reserved.