我使用Python实现了异步拉取订阅者。这是基本代码
def receive_messages(project, subscription_name):
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
def callback(message):
print ("A")
time.sleep(2)
print('Received message: {}'.format(message))
message.ack()
print ("B")
subscriber.subscribe(subscription_path, callback=callback)
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
我需要打印像
一个,
信息
乙
一个
信息
乙
(我需要按顺序运行)或通过给定的没有线程接收消息。我找不到限制线程的方法。由于许多线程,我的程序给出了Segmentation fault。
我如何控制没有线程来接收消息。
如果您需要处理回调以顺序运行,那么最好使用消息传递模型而不是修改订阅者内部。如果将收到的消息推送到显式的queue.Queue,则可以确保只有一个工作程序从此队列中撤出,并且一次只处理一个工作程序。但请注意,虽然这为您提供了“一次一个”保证,但如果只有一个订阅作业,则它不会为您提供任何订购保证。消息仍然可以相对于它们发布的顺序以任意顺序处理。
使用Policy可以解决问题
from google.cloud import pubsub_v1
from concurrent import futures
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)
def callback(message):
print (str(message.data) + " " + str(threading.current_thread()))
message.ack()
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
executor = futures.ThreadPoolExecutor(max_workers=5)
policy = pubsub_v1.subscriber.policy.thread.Policy(subscriber, subscription_path, executor=executor, flow_control=flow_control)
policy.open(callback)
我们可以使用max_workers设置最大线程数。还可以设置流量控制设置。