我有一个类似的查询:
query = HistoryLogs.query()
query = query.filter(HistoryLogs.exec_id == exec_id)
iter = query.iter()
for ent in iter:
# write log to file, nothing memory intensive
我在for循环中添加了日志,读取1万行增加了200MB的内存使用量,然后读取接下来的1万行增加了200MB的存储量,依此类推。读取100K需要2GB,超过了内存上限。
读取10K行后,我通过添加以下内容,尝试在for循环中清除内存缓存:
# clear ndb cache in order to reduce memory footprint
context = ndb.get_context()
context.clear_cache()
在for循环中,每第10K次迭代,但导致查询超时,引发错误BadRequestError: The requested query has expired. Please restart it with the last cursor to read more results. ndb
。
[我最初的期望是通过使用query.iter()
而不是query.fetch()
,我不会遇到任何内存问题,并且内存几乎是恒定的,但事实并非如此。有没有一种方法可以使用迭代器读取数据,而不会超出时间或内存限制?通过清除上下文缓存,我看到内存消耗几乎是恒定的,但是在检索所有行所需的时间上遇到了麻烦。
BTW,有很多行要检索,最多150K。是否可以通过一些简单的调整来完成此操作,或者我需要更复杂的解决方案,例如一种将使用一些并行化?
您是否在remote-api-shell中运行此程序?否则,我会以为App Engine的最大请求超时将开始成为问题。
您当然应该在Google数据流中运行它。它将为您并行化/运行更快。
https://beam.apache.org/documentation/programming-guide/https://beam.apache.org/releases/pydoc/2.17.0/index.htmlhttps://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
我想您的管道代码看起来像这样:
def run(project, gcs_output_prefix, exec_id):
def format_result(element):
csv_cells = [
datastore_helper.get_value(element.properties['exec_id']),
# extract other properties here!!!!!
]
return ','.join([str(cell) for cell in csv_cells])
pipeline_options = PipelineOptions([])
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
query = query_pb2.Query()
query.kind.add().name = 'HistoryLogs'
datastore_helper.set_property_filter(query.filter, 'exec_id', PropertyFilter.EQUAL, exec_id)
_ = (p
| 'read' >> ReadFromDatastore(project, query, None)
| 'format' >> beam.Map(format_result)
| 'write' >> beam.io.WriteToText(file_path_prefix=gcs_output_prefix,
file_name_suffix='.csv',
num_shards=1) # limits output to a single file
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run(project='YOUR-PROJECT-ID',
gcs_output_prefix='gs://YOUR-PROJECT-ID.appspot.com/history-logs-export/some-time-based-prefix/',
exec_id='1234567890')
此代码从Google数据存储区读取并以csv格式导出到Google Cloud Storage。