PubSub 内存分配通过 Cloud Run 服务激增

问题描述 投票:0回答:0

我遇到了一个问题,内存分配峰值每天发生 ~3 次。我设置了一个带有端点 /webhook 的云运行实例,它获取请求并将它们批量发布到 pub-sub 主题。我怀疑我需要一些东西来限制

publish_futures
的列表大小,但我不确定该怎么做或者是否有更好的方法。

Docker容器指定使用

FROM python:3.10-slim

requirements.txt

gunicorn[gevent]==20.1.0
google-cloud-pubsublite==1.2.0
google-cloud-logging
certifi
requests

main.py

import os
import sys
import logging
from google.cloud import logging as cloudlogging
import datetime
import json
from flask import Flask, request, abort, Response

from concurrent import futures
from google.cloud import pubsub_v1

batch_settings = pubsub_v1.types.BatchSettings(
max_messages=100,  # default 100
max_bytes=1024,  # default 1 MB
max_latency=3,  # default 10 ms
)

publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path("project", "topic")
publish_futures = []

def callback(future: pubsub_v1.publisher.futures.Future) -\> None:
    message_id = future.result()

app = Flask(__name__)
lg_client = cloudlogging.Client()
lg_client.setup_logging(log_level=logging.ERROR)

@app.route("/webhook", methods=\["GET"\])
def index():
"""Event Route.
get:
summary: Event Endpoint
description: Receives a single event with a list of parameters

    """
    data = request.args
    event_kv_list = data.to_dict()
    
    try:
    
        msg = {}
        msg['data'] = json.dumps(event_kv_list).encode('utf-8')
        msg['attributes'] = {}
        publish_future = publisher.publish(topic_path, msg['data'])
        publish_future.add_done_callback(callback)
        publish_futures.append(publish_future)
    
    
    except Exception as e:
        print(e)
    
    return Response("200")

if __name__ == "__main__":
    app.run(debug=False, host="0.0.0.0",
    port=int(os.environ.get("PORT", 8080)))
    futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

我尝试使用 batch_settings 和并发来减少数量,但我怀疑问题出在代码本身。

python-3.x flask google-cloud-pubsub google-cloud-run concurrent.futures
© www.soinside.com 2019 - 2024. All rights reserved.