将Google数据流传递数据存储键作为输入参数

问题描述 投票:0回答:1

我正在尝试创建一个Google数据流模板以读取JSON文件并将其加载到Google数据存储中。下面是我的代码。

我能够成功加载数据,但是我想将数据存储键/ KIND作为模板中的输入参数传递,并使用它们来创建实体。有人可以帮助我如何传递代码吗?

下面是在运行时从中获取输入的代码段。我有--datastore_key作为其中之一。

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
                '--json_input',
                dest='json_input',
                type=str,
                required=False,
                help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')

        parser.add_value_provider_argument(
                '--project_id',
                dest='project_id',
                type=str,
                required=False,
                help='Input Project ID.')

        parser.add_value_provider_argument(
                '--datastore_key',
                dest='datastore_key',
                type=str,
                required=False,
                help='The Key name')

下面是根据instruction here我将datastore_key分配给实体创建的代码段。

class CreateHbaseRow(beam.DoFn): 
    def __init__(self, project_id):
       self.project_id = project_id

    def start_bundle(self):
        self.client = datastore.Client()

    def start_datastore(self, datastore_key):
        self.datastore_key = datastore_key

    def process(self, an_int):
        yield self.datastore_key.get() + an_int

    def process(self, element):
        try:
            key = self.client.key(datastore_key ,element['customerNumber'])
            entity = datastore.Entity(key=key)
            entity.update(element)  
            self.client.put(entity) 
        except:   
            logging.error("Failed with input: ", str(element))

我正在如下创建管道,

p = beam.Pipeline(options=options)

lines_text  = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson()) 
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(project_id))

我没有得到如果我将其作为运行时参数传递给它,则不会创建数据存储键。如果我像这样硬编码,则可以正常工作

key = self.client.key('customer' ,element['customerNumber'])

我想要这样的东西

key = self.client.key(runtime_datastore_key ,runtime_datastore_id)

有人可以帮助我如何将数据存储键/种类作为运行时参数传递给我吗?

谢谢,GS

python-3.x google-cloud-datastore google-cloud-dataflow value-provider
1个回答
0
投票

似乎您没有将datastore_key值提供程序传递给CreateHbaseRow


尝试使用:

class CreateHbaseRow(beam.DoFn): 
    def __init__(self, project_id, datastore_key):
       self.project_id = project_id
       self.datastore_key = datastore_key

    def start_bundle(self):
        self.client = datastore.Client()

    def process(self, element):
        try:
            key = self.client.key(datastore_key.get(), element['customerNumber'])
            entity = datastore.Entity(key=key)
            entity.update(element)  
            self.client.put(entity) 
        except:   
            logging.error("Failed with input: ", str(element))

请注意,我离开了project_id,因为您似乎想要它,但下面的代码未使用它


您还希望确保将相关的值提供者从options实例传递到DoFn。因此,您的管道创建代码变为:

p = beam.Pipeline(options=options)

lines_text  = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson()) 
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(options.project_id, options.datastore_key))
© www.soinside.com 2019 - 2024. All rights reserved.