我正在使用Google PubSub Publisher触发通知,并在Google Cloud Storage Bucket中上载文档时将消息发送给订户。
我通过->创建了通知主题> [[gsutil通知创建-t [TOPIC_NAME] -f json -e OBJECT_FINALIZE gs:// [BUCKET_NAME]
我的订阅者功能是:**def callback(message):
try:
#storage_client = storage.Client.from_service_account_json('storage_service_key.json')
print('Received message: {}'.format(message.data.decode("utf-8")))
data = json.loads(message.data.decode("utf-8"))
filename = data['name']
file_name = re.search(r'/(.*)', filename).group(1)
#filelink = data['selfLink']
print("Processing the file : {}".format(file_name))
path = "sample/"+file_name
download_files(path, file_name)
rming.image_remover(file_name) ## my custom function
message.ack()
os.remove(file_name)
except Exception as error_message:
print("Error in callback method: {}".format(error_message))
pass
flow_control = pubsub_v1.types.FlowControl(max_messages=1)
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(proj_name, sub_name)
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback,flow_control=flow_control
)
print('Listening for messages on: {}'.format(subscription_path))
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
#flag=0
streaming_pull_future.result()
except Exception as error_message: # noqa
print("exception occured while handling subscription: {}").format(error_message)
pass
**我想在Docker容器中部署此Subscriber Function。然后它将使订阅服务器运行24/7。[当没有可用消息时,我可以使订阅者进入睡眠状态/(不发出拉取请求),或者我的意思是,当没有传入消息时,我可以使订阅者在空闲时间进入睡眠状态。
感谢您的任何帮助!!!!
--allow-unauthenticated
参数,请创建一个角色为run.invoker
和set this service account on your Push subscription for allowing PubSub to call your Cloud Run的服务帐户。正确遵循文档,您必须授予具有令牌创建者角色的PubSub服务代理。