如何在python梁中制作通用的Protobuf Parser DoFn?

问题描述 投票:1回答:1

上下文 我正在使用一个流媒体管道,它在pubsub中有一个protobuf数据源。我希望将这个protobuf解析为python dict,因为数据接收器要求输入是dicts的集合。我通过在DoFn的process函数中初始化protobuf消息成功开发了Protobuf Parser。

Why a Generic Protobuf Parser is Needed

但是,我想知道,是否有可能在Beam上制作通用的ProtobufParser DoFn?从工程角度来看,通用DoFn非常有用,可以避免重新实现现有功能并实现代码重用。在Java中,我知道我们能够使用泛型,因此在Java中实现这个通用的ProtobufParser相对容易。由于Python函数是第一类对象,我在想是否可以将Protobuf模式类(而不是消息实例对象)传递给DoFn。我试着这样做,但是我一直都失败了。

Successful Parser with Caveat: not Generalizable

下面是我目前成功的protobuf解析器。 protobuf消息在process函数内初始化。

class ParsePubSubProtoToDict(beam.DoFn):

    def process(self, element, *args, **kwargs):
        from datapipes.protos.data_pb2 import DataSchema
        from google.protobuf.json_format import MessageToDict

        message = DataSchema()
        message.ParseFromString(element)

        obj = MessageToDict(message, preserving_proto_field_name=True)

        yield obj

虽然上面的Protobuf DoFn解析器很有用,但它并不适用于所有protobuf模式,因此这将导致需要为不同的protobuf模式重新实现新的DoFn解析器。

My Attempts

为了使解析器适用于所有protobuf模式,我尝试将protobuf模式(它是作为Python中的类生成)传递给DoFn。

class ParsePubSubProtoToDict(beam.DoFn):
    def __init__(self, proto_class):
        self.proto_class = proto_class

    def process(self, element, *args, **kwargs):
        from google.protobuf.json_format import MessageToDict

        message = self.proto_class()
        message.ParseFromString(element)
        obj = MessageToDict(message, preserving_proto_field_name=True)

        yield obj


def run_pubsub_to_gbq_pipeline(argv):
    ...
    from datapipes.protos import data_pb2

    with beam.Pipeline(options=options) as p:
        (p |
         'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
         'Proto to JSON' >> beam.ParDo(ParsePubSubProtoToDict(data_pb2.DataSchema().__class__)) |
         'Print Result' >> beam.Map(lambda x: print_data(x))

和其他类似的技术,但是,我的所有尝试都失败了同样的错误信息:pickle.PicklingError: Can't pickle <class 'data_pb2.DataSchema'>: it's not found as data_pb2.DataSchema

从这个错误消息,我有两个假设为什么会出现问题:

  1. Protobuf模式类是不可序列化的。然而,这个假设可能是错误的,因为虽然我知道pickle无法序列化protobuf架构,但如果我使用dill,我能够序列化protobuf架构。但除此之外,我仍然有点不确定python梁中的DoFn如何实现序列化(例如:当它使用dillpickle来序列化事物时,对象的序列化格式是什么使其可序列化并与DoFn兼容,等等。)
  2. 在DoFn类中导入错误。由于函数/类范围和数据流工作者,我遇到了几个python beam的导入错误问题,为了解决这个问题,我不得不在需要它的函数中本地导入包,而不是在模块中全局导入。那么也许,如果我们将protobuf模式类传递给DoFn,模式导入实际上是在DoFn之外完成的,因此DoFn无法正确解析DoFn中的类名?

我的问题是:

  1. 为什么会出现此错误,如何解决此错误?
  2. 是否可以传递protobuf架构类?或者有没有更好的方法来实现python dict解析器DoFn的通用protobuf而不将protobuf架构类传递给DoFn?
  3. 如何在Python中使用DoFn,如何确保传递给DoFn创建的对象(__init__)是可序列化的?在beam上有一个Serializable类,我可以继承它,以便我可以将我的不可序列化的对象转换为可序列化的吗?

非常感谢!非常感谢您的帮助。

python google-cloud-platform protocol-buffers google-cloud-dataflow apache-beam
1个回答
1
投票

我实际上找到了用beam.Map创建通用Protobuf Parser的替代解决方案

def convert_proto_to_dict(data, schema_class):
    message = schema_class()

    if isinstance(data, (str, bytes)):
        message.ParseFromString(data)
    else:
        message = data

    return MessageToDict(message, preserving_proto_field_name=True)


def run_pubsub_to_gbq_pipeline(argv):
    ... options initialization
    from datapipes.protos import data_pb2

    with beam.Pipeline(options=options) as p:
        (p |
         'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
         'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, data_pb2.DataSchema)) |
         'Print Result' >> beam.Map(lambda x: print_data(x))

首先,我创建了一个函数,它接收protobuf模式类和protobuf数据(当前以字节字符串形式)作为参数。此函数将字符串字节数据初始化并解析为protobuf消息,并将protobuf消息转换为python字典。

这个函数然后由beam.Map使用,所以现在我能够在没有beam.DoFn的情况下开发一个通用的Protobuf Parser。但是,我仍然很好奇为什么protobuf架构类在与DoFn一起使用时会出现问题,所以如果你知道为什么以及如何解决这个问题,请在这里分享你的答案,谢谢!

© www.soinside.com 2019 - 2024. All rights reserved.