设置Python Pub / Sub异步拉订阅者线程计数

问题描述 投票:2回答:2

我使用Python实现了异步拉取订阅者。这是基本代码

def receive_messages(project, subscription_name):

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print ("A")
        time.sleep(2)
        print('Received message: {}'.format(message))
        message.ack()
        print ("B")

    subscriber.subscribe(subscription_path, callback=callback)

    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)

我需要打印像

一个,

信息

一个

信息

(我需要按顺序运行)或通过给定的没有线程接收消息。我找不到限制线程的方法。由于许多线程,我的程序给出了Segmentation fault。

我如何控制没有线程来接收消息。

python multithreading python-3.x asynchronous google-cloud-pubsub
2个回答
1
投票

如果您需要处理回调以顺序运行,那么最好使用消息传递模型而不是修改订阅者内部。如果将收到的消息推送到显式的queue.Queue,则可以确保只有一个工作程序从此队列中撤出,并且一次只处理一个工作程序。但请注意,虽然这为您提供了“一次一个”保证,但如果只有一个订阅作业,则它不会为您提供任何订购保证。消息仍然可以相对于它们发布的顺序以任意顺序处理。


0
投票

使用Policy可以解决问题

from google.cloud import pubsub_v1
from concurrent import futures

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)

def callback(message):
        print (str(message.data) + " " + str(threading.current_thread()))
        message.ack()
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
executor = futures.ThreadPoolExecutor(max_workers=5)
policy = pubsub_v1.subscriber.policy.thread.Policy(subscriber, subscription_path, executor=executor, flow_control=flow_control)
policy.open(callback)

我们可以使用max_workers设置最大线程数。还可以设置流量控制设置。

© www.soinside.com 2019 - 2024. All rights reserved.