我遇到了一个问题,内存分配峰值每天发生 ~3 次。我设置了一个带有端点 /webhook 的云运行实例,它获取请求并将它们批量发布到 pub-sub 主题。我怀疑我需要一些东西来限制
publish_futures
的列表大小,但我不确定该怎么做或者是否有更好的方法。
Docker容器指定使用
FROM python:3.10-slim
requirements.txt
gunicorn[gevent]==20.1.0
google-cloud-pubsublite==1.2.0
google-cloud-logging
certifi
requests
main.py
import os
import sys
import logging
from google.cloud import logging as cloudlogging
import datetime
import json
from flask import Flask, request, abort, Response
from concurrent import futures
from google.cloud import pubsub_v1
batch_settings = pubsub_v1.types.BatchSettings(
max_messages=100, # default 100
max_bytes=1024, # default 1 MB
max_latency=3, # default 10 ms
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path("project", "topic")
publish_futures = []
def callback(future: pubsub_v1.publisher.futures.Future) -\> None:
message_id = future.result()
app = Flask(__name__)
lg_client = cloudlogging.Client()
lg_client.setup_logging(log_level=logging.ERROR)
@app.route("/webhook", methods=\["GET"\])
def index():
"""Event Route.
get:
summary: Event Endpoint
description: Receives a single event with a list of parameters
"""
data = request.args
event_kv_list = data.to_dict()
try:
msg = {}
msg['data'] = json.dumps(event_kv_list).encode('utf-8')
msg['attributes'] = {}
publish_future = publisher.publish(topic_path, msg['data'])
publish_future.add_done_callback(callback)
publish_futures.append(publish_future)
except Exception as e:
print(e)
return Response("200")
if __name__ == "__main__":
app.run(debug=False, host="0.0.0.0",
port=int(os.environ.get("PORT", 8080)))
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
我尝试使用 batch_settings 和并发来减少数量,但我怀疑问题出在代码本身。