使用 Apache Beam 和 Python 将带有 ordering_key 的消息写入 Google PubSub

问题描述 投票:0回答:1

我正在尝试使用 Apache Beam (https://cloud.google.com/pubsub/docs/publisher) 将带有 ordering_key 的 Google PubSub 消息写入主题。尽管带有 ordering_key 的 Google Pubsub 是测试版功能,但我可以使用普通的 PubSub 客户端库发布消息。我希望在 Apache Beam 中也能做到这一点。然而,它似乎没有任何可用于 Python Apache Beam 库的东西。 我一直在尝试覆盖beam.io.WriteToPubSub(通过更改_to_proto_str)以使用ordering_key编写protobuf消息(https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google .pubsub.v1.PubsubMessage)。最后消息会是这样的

  "data": string,
  "attributes": {
    string: string,
    ...
  },
  "messageId": string,
  "publishTime": string,
  "ordering_key": string
}
sdks.python.apache_beam.io.gcp.pubsub.PubsubMessage._to_proto_str
  def _to_proto_str(self):
    msg = pubsub.types.pubsub_pb2.PubsubMessage()
    msg.data = self.data
    for key, value in iteritems(self.attributes):
      msg.attributes[key] = value
    msg.ordering_key = self.ordering_key
    return msg.SerializeToString()

但是,当我查看主题中的结束消息时,ordering_key 似乎消失了。在最坏的情况下,我想我也可以使用 PubSub 客户端来发布消息。 但是,如果有人能为我指明这种改变的正确方向,那就更好了。我知道 apache Beam 项目的贡献者一定做了类似的事情,因为他们不久前已经包含了更改 PubSub 消息属性的功能。

更新:Apache Beam 2.24.0 依赖于旧版本的 PubSub 客户端库。我认为是因为他们想延长对 Python 2 的支持时间。但是,一切可能会在 10 月 7 日左右结束(至少对于 Google 来说,他们在该日期之后停止支持 Python 2 for Dataflow)。其他人可能需要等待 2.24.0 之后的任何版本。

作为解决方法,我已在 Apache Beam 2.24.0 之上成功安装了最新的 PubSub 客户端库。并创建新的自定义 PubSub IO 作为 DoFn(您只需覆盖设置方法并在其中创建发布者客户端)。我现在可以使用订购密钥发布消息。但是,我不确定是否会因为我的更改而代理任何内容,这对于演示目的来说是可以的。

 def setup(self):
        publisher_options = pubsub_v1.types.PublisherOptions(
            enable_message_ordering=True
        )
        self.publisher = pubsub_v1.PublisherClient(
            publisher_options=publisher_options,
            batch_settings=pubsub_v1.types.BatchSettings()
        )
python-3.x apache-beam google-cloud-pubsub dataflow
1个回答
0
投票

我在使用 apache-beam 2.49.0 和 python sdk 3.11 时遇到同样的问题。

基本上,按照官方文档中描述的步骤,我创建了一个 PubsubMessage 对象,还提供了 ordering_key。

最终结果是消息被正确发送到 PubSub,但没有分配任何排序键:ordering_key 在 PubSub UI 上始终不可见,而如果您使用不同的客户端,例如 google cli,我可以在消息。

您知道可能出现什么问题吗?

遵循所使用的代码片段:

pipeline
| "Convert encoded message to PubsubMessage Object" >> beam.Map(lambda encodedMessage:beam.io.PubsubMessage(data=encodedMessage, attributes={"name":"test"}, ordering_key="testKey"))

| "Publish message to PubSub" >> beam.io.WriteToPubSub(
            topic=f"projects/{configs['project']}/topics/{topic}", with_attributes=True)
) ```
© www.soinside.com 2019 - 2024. All rights reserved.