如何批量上传数据到appengine数据存储区?较旧的方法不起作用

问题描述 投票:15回答:4

这应该是一个相当普遍的要求,也是一个简单的过程:将数据批量上传到appengine数据存储区。

但是,stackoverflow(下面的链接*)中提到的旧解决方案似乎都不再有效。 bulkloader方法是使用DB API上传到数据存储区时最合理的解决方案,不适用于NDB API

现在,批量加载器方法似乎已被弃用,旧文档中仍然存在的旧链接会导致页面错误。这是一个例子

https://developers.google.com/appengine/docs/python/tools/uploadingdata

以上链接仍然显示在此页面上:https://developers.google.com/appengine/docs/python/tools/uploadinganapp

现在批量加载数据的推荐方法是什么?

两个可行的替代方案似乎是1)使用remote_api或2)将CSV文件写入GCS存储桶并从中读取。有没有人有成功使用这两种方法的经验?

任何指针将不胜感激。谢谢!

[*以下链接提供的解决方案已不再有效]

[1] how does one upload data in bulk to a google appengine datastore?

[2] How to insert bulk data in Google App Engine Datastore?

python google-app-engine google-cloud-storage google-cloud-datastore
4个回答
0
投票

有些人可能在我的情况下:我无法使用数据存储区的导入/导出实用程序,因为我的数据需要在进入数据存储区之前进行转换。

我最终使用apache-beamgoogle cloud dataflow)。

你只需要编写几行“beam”代码即可

  • 读取您的数据(例如,托管在云存储上) - 您将获得PCollection的字符串,
  • 做任何你想要的变换(所以你得到一个PCollection的数据存储实体),
  • 把它们转移到datastore sink

有关具体用例,请参阅How to speedup bulk importing into google cloud datastore with multiple workers?

我能够以每秒800个实体的速度写入我的数据存储区,有5名工作人员。这使我能够在大约5个小时内完成导入任务(包含1600万行)。如果你想让它更快,请使用更多的工人:D


9
投票

方法1:使用remote_api

如何:编写一个bulkloader.yaml文件并使用终端中的“appcfg.py upload_data”命令直接运行它我不推荐这种方法有以下几个原因:1。巨大的延迟2.不支持NDB

方法2:GCS并使用mapreduce

将数据文件上传到GCS:

使用“storage-file-transfer-json-python”github项目(chunked_transfer.py)将文件从本地系统上传到gcs。确保从应用引擎管理控制台生成正确的“client-secrets.json”文件。

MapReduce的:

使用“appengine-mapreduce”github项目。将“mapreduce”文件夹复制到项目顶级文件夹。

将以下行添加到app.yaml文件中:

includes:
  - mapreduce/include.yaml

下面是你的main.py文件

import cgi
import webapp2
import logging
import os, csv
from models import DataStoreModel
import StringIO
from google.appengine.api import app_identity
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce.input_readers import InputReader

def testmapperFunc(newRequest):
    f = StringIO.StringIO(newRequest)
    reader = csv.reader(f, delimiter=',')
    for row in reader:
        newEntry = DataStoreModel(attr1=row[0], link=row[1])
        yield op.db.Put(newEntry)

class TestGCSReaderPipeline(base_handler.PipelineBase):
    def run(self, filename):
        yield mapreduce_pipeline.MapreducePipeline(
                "test_gcs",
                "testgcs.testmapperFunc",
                "mapreduce.input_readers.FileInputReader",
                mapper_params={
                    "files": [filename],
                    "format": 'lines'
                },
                shards=1)

class tempTestRequestGCSUpload(webapp2.RequestHandler):
    def get(self):
        bucket_name = os.environ.get('BUCKET_NAME',
                                     app_identity.get_default_gcs_bucket_name())

        bucket = '/gs/' + bucket_name
        filename = bucket + '/' + 'tempfile.csv'

        pipeline = TestGCSReaderPipeline(filename)
        pipeline.with_params(target="mapreducetestmodtest")
        pipeline.start()
        self.response.out.write('done')

application = webapp2.WSGIApplication([
    ('/gcsupload', tempTestRequestGCSUpload),
], debug=True)

要记住:

  1. Mapreduce项目使用现已弃用的“Google Cloud Storage Files API”。所以未来的支持不能保证。
  2. Map reduce为数据存储读取和写入增加了一点开销。

方法3:GCS和GCS客户端库

  1. 使用上述文件传输方法将csv / text文件上载到gcs。
  2. 使用gcs客户端库(将“cloudstorage”文件夹复制到应用程序顶级文件夹)。

将以下代码添加到应用程序main.py文件中。

import cgi
import webapp2
import logging
import jinja2
import os, csv
import cloudstorage as gcs
from google.appengine.ext import ndb
from google.appengine.api import app_identity
from models import DataStoreModel

class UploadGCSData(webapp2.RequestHandler):
    def get(self):
        bucket_name = os.environ.get('BUCKET_NAME',
                                     app_identity.get_default_gcs_bucket_name())
        bucket = '/' + bucket_name
        filename = bucket + '/tempfile.csv'
        self.upload_file(filename)

    def upload_file(self, filename):
        gcs_file = gcs.open(filename)
        datareader = csv.reader(gcs_file)
        count = 0
        entities = []
        for row in datareader:
            count += 1
                newProd = DataStoreModel(attr1=row[0], link=row[1])
                entities.append(newProd)

            if count%50==0 and entities:
                ndb.put_multi(entities)
                entities=[]

        if entities:
            ndb.put_multi(entities)

application = webapp2.WSGIApplication([
    ('/gcsupload', UploadGCSData),
], debug=True)

3
投票

远程API方法,如链接[1]中所示,仍然可以正常工作 - 尽管如果你有超过几百行,它会非常慢。

我已经成功地将GCS与MapReduce框架结合使用来下载而不是上传数据存储区的内容,但原则应该是相同的。请参阅mapreduce documentation:实际上您只需要映射器步骤,因此您可以定义一个简单的函数,该函数接受CSV中的行并从该数据创建数据存储区实体。


1
投票

截至2018年,最好的方法是使用the new import/export capability

© www.soinside.com 2019 - 2024. All rights reserved.