试图在数据流模板中获得数据存储的名称空间的运行时值

问题描述 投票:0回答:1

我正在尝试使用数据流模板中的运行时值提供程序从不同名称空间的数据存储中获取数据。

设置运行时ValueProvider

class CatalaugeOption(PipelineOptions):
        @classmethod
        def _add_argparse_args(cls, parser):
             parser.add_value_provider_argument('--partner', default='', dest='partner', help='current date directory')

        my_options = options.view_as(CatalaugeOption)
        parser.add_argument('--kind', dest='kind', default='Product', help='Datastore Kind')
        known_args, pipeline_args = parser.parse_known_args(argv)

从数据存储读取数据

with beam.Pipeline(argv=pipeline_args) as p:
        req_options = p.options.get_all_options()
        project = req_options['project']
        namespace = my_options.partner
        kind = known_args.kind
        query = query_pb2.Query()
        query.kind.add().name = kind
        protobufs = p | 'Read From Datastore' >> ReadFromDatastore(
            project, query, namespace=namespace)
         p.run()
        return

ValueError:无效的DisplayDataItem。值RuntimeValueProvider(选项:partner,类型:str,default_value:”)是不受支持的类型。

google-cloud-datastore google-cloud-dataflow
1个回答
0
投票

请注意,此功能已合并到Apache Beam master分支中,应该成为即将推出的2.19.0版本的一部分。

https://github.com/apache/beam/pull/10683https://issues.apache.org/jira/browse/BEAM-7810

© www.soinside.com 2019 - 2024. All rights reserved.