为什么要遍历ndb数据存储区查询消耗太多内存?

问题描述 投票:0回答:1

我有一个类似的查询:

query = HistoryLogs.query()
query = query.filter(HistoryLogs.exec_id == exec_id)
iter = query.iter()

for ent in iter:
    # write log to file, nothing memory intensive

我在for循环中添加了日志,读取1万行增加了200MB的内存使用量,然后读取接下来的1万行增加了200MB的存储量,依此类推。读取100K需要2GB,超过了内存上限。

读取10K行后,我通过添加以下内容,尝试在for循环中清除内存缓存:

                # clear ndb cache in order to reduce memory footprint
                context = ndb.get_context()
                context.clear_cache()

在for循环中,每第10K次迭代,但导致查询超时,引发错误BadRequestError: The requested query has expired. Please restart it with the last cursor to read more results. ndb

[我最初的期望是通过使用query.iter()而不是query.fetch(),我不会遇到任何内存问题,并且内存几乎是恒定的,但事实并非如此。有没有一种方法可以使用迭代器读取数据,而不会超出时间或内存限制?通过清除上下文缓存,我看到内存消耗几乎是恒定的,但是在检索所有行所需的时间上遇到了麻烦。

BTW,有很多行要检索,最多150K。是否可以通过一些简单的调整来完成此操作,或者我需要更复杂的解决方案,例如一种将使用一些并行化?

python python-2.7 performance google-cloud-datastore app-engine-ndb
1个回答
0
投票

您是否在remote-api-shell中运行此程序?否则,我会以为App Engine的最大请求超时将开始成为问题。

您当然应该在Google数据流中运行它。它将为您并行化/运行更快。

https://beam.apache.org/documentation/programming-guide/https://beam.apache.org/releases/pydoc/2.17.0/index.htmlhttps://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py

我想您的管道代码看起来像这样:

def run(project, gcs_output_prefix, exec_id):

    def format_result(element):
        csv_cells = [
            datastore_helper.get_value(element.properties['exec_id']),
            # extract other properties here!!!!!
        ]
        return ','.join([str(cell) for cell in csv_cells])

    pipeline_options = PipelineOptions([])
    pipeline_options.view_as(SetupOptions).save_main_session = True

    p = beam.Pipeline(options=pipeline_options)

    query = query_pb2.Query()
    query.kind.add().name = 'HistoryLogs'

    datastore_helper.set_property_filter(query.filter, 'exec_id', PropertyFilter.EQUAL, exec_id)

    _ = (p 
         | 'read' >> ReadFromDatastore(project, query, None)
         | 'format' >> beam.Map(format_result)
         | 'write' >> beam.io.WriteToText(file_path_prefix=gcs_output_prefix,
                                          file_name_suffix='.csv',
                                          num_shards=1) # limits output to a single file
    result = p.run()
    result.wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run(project='YOUR-PROJECT-ID', 
        gcs_output_prefix='gs://YOUR-PROJECT-ID.appspot.com/history-logs-export/some-time-based-prefix/',
        exec_id='1234567890')

此代码从Google数据存储区读取并以csv格式导出到Google Cloud Storage。

© www.soinside.com 2019 - 2024. All rights reserved.