RabbitMQ 和 GCP Pub/Sub 集成

问题描述 投票:0回答:1

我正在致力于集成 RabbitMQ 和 GCP Pub/Sub。 我正在使用 Python 的

Pika
库。通过
BlockingConnection
建立连接。
我的消费者作为单独的线程运行。最大预期消息工作负载高达 100 条消息/秒。

根据我执行的一些初步测试,解决方案似乎有效,但我对回调函数的构造方式有一些疑问。我正在调用

publish
方法发布到 GCP 中的 Pub/Sub,然后在块尝试中,
basic_ack
处理已处理的消息。

如果有人在这方面有经验,我能否要求对我的解决方案发表一些意见,并可能提供一些如何实施的示例。

    def consume_data_callback(self, basic_deliver, body):
        
        # ... some code

        future = self.publisher.publish(topic_path, self.payload.SerializeToString())
        try:
            message_id = future.result(timeout=1)
            self.channel.basic_ack(basic_deliver.delivery_tag)
        except Exception as e:
            future.cancel()
            _logger.error("Result after publishing Pub/Sub with: {}".format(e)) 

谢谢您的回答。

google-cloud-platform rabbitmq google-cloud-pubsub pika
1个回答
0
投票

我个人还没有尝试过,但我确实搜索了可能对您有帮助的参考资料。这篇RabbitMQ - 发布/订阅文章有一个回调代码示例,可以帮助您确认您所编写的内容是否是合适的解决方案。

GitHub 完整代码版本:

#!/usr/bin/env python
import os
import pika
import sys


def main():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs', exchange_type='fanout')

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='logs', queue=queue_name)

    def callback(ch, method, properties, body):
        print(f" [x] {body.decode()}")

    print(' [*] Waiting for logs. To exit press CTRL+C')
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)

    channel.start_consuming()


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)
© www.soinside.com 2019 - 2024. All rights reserved.