我正在尝试使用 Apache Beam (https://cloud.google.com/pubsub/docs/publisher) 将带有 ordering_key 的 Google PubSub 消息写入主题。尽管带有 ordering_key 的 Google Pubsub 是测试版功能,但我可以使用普通的 PubSub 客户端库发布消息。我希望在 Apache Beam 中也能做到这一点。然而,它似乎没有任何可用于 Python Apache Beam 库的东西。 我一直在尝试覆盖beam.io.WriteToPubSub(通过更改_to_proto_str)以使用ordering_key编写protobuf消息(https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google .pubsub.v1.PubsubMessage)。最后消息会是这样的
"data": string,
"attributes": {
string: string,
...
},
"messageId": string,
"publishTime": string,
"ordering_key": string
}
sdks.python.apache_beam.io.gcp.pubsub.PubsubMessage._to_proto_str
def _to_proto_str(self):
msg = pubsub.types.pubsub_pb2.PubsubMessage()
msg.data = self.data
for key, value in iteritems(self.attributes):
msg.attributes[key] = value
msg.ordering_key = self.ordering_key
return msg.SerializeToString()
但是,当我查看主题中的结束消息时,ordering_key 似乎消失了。在最坏的情况下,我想我也可以使用 PubSub 客户端来发布消息。 但是,如果有人能为我指明这种改变的正确方向,那就更好了。我知道 apache Beam 项目的贡献者一定做了类似的事情,因为他们不久前已经包含了更改 PubSub 消息属性的功能。
更新:Apache Beam 2.24.0 依赖于旧版本的 PubSub 客户端库。我认为是因为他们想延长对 Python 2 的支持时间。但是,一切可能会在 10 月 7 日左右结束(至少对于 Google 来说,他们在该日期之后停止支持 Python 2 for Dataflow)。其他人可能需要等待 2.24.0 之后的任何版本。
作为解决方法,我已在 Apache Beam 2.24.0 之上成功安装了最新的 PubSub 客户端库。并创建新的自定义 PubSub IO 作为 DoFn(您只需覆盖设置方法并在其中创建发布者客户端)。我现在可以使用订购密钥发布消息。但是,我不确定是否会因为我的更改而代理任何内容,这对于演示目的来说是可以的。
def setup(self):
publisher_options = pubsub_v1.types.PublisherOptions(
enable_message_ordering=True
)
self.publisher = pubsub_v1.PublisherClient(
publisher_options=publisher_options,
batch_settings=pubsub_v1.types.BatchSettings()
)
我在使用 apache-beam 2.49.0 和 python sdk 3.11 时遇到同样的问题。
基本上,按照官方文档中描述的步骤,我创建了一个 PubsubMessage 对象,还提供了 ordering_key。
最终结果是消息被正确发送到 PubSub,但没有分配任何排序键:ordering_key 在 PubSub UI 上始终不可见,而如果您使用不同的客户端,例如 google cli,我可以在消息。
您知道可能出现什么问题吗?
遵循所使用的代码片段:
pipeline
| "Convert encoded message to PubsubMessage Object" >> beam.Map(lambda encodedMessage:beam.io.PubsubMessage(data=encodedMessage, attributes={"name":"test"}, ordering_key="testKey"))
| "Publish message to PubSub" >> beam.io.WriteToPubSub(
topic=f"projects/{configs['project']}/topics/{topic}", with_attributes=True)
) ```