Celery:当排队太多时阻止添加更多任务

问题描述 投票:1回答:1

我有一个Flask REST API,它利用Celery来运行异步请求。

想法是async=1查询参数指示应该异步处理请求(立即返回客户端将使用的任务ID)。

与此同时,我希望在等待处理太多时阻止接受新任务。

下面的代码可行,但accepting_new_tasks()需要约2秒,这太慢了。

Celery中是否有配置(或其他东西)允许限制等待任务的数量;或者更快的方式来获得等待任务的数量?

import math

from celery import Celery
from flask import abort, Flask, jsonify, request


flask_app = Flask(__name__)
celery_app = Celery("tasks", broker="rabbit...")


@flask_app.route("/")
def home():
    async_ = request.args.get("async")
    settings = request.args.get("settings")

    if async_:
        if not accepting_new_tasks(celery_app):
            return abort(503)

        task = celery_app.send_task(name="my-task", kwargs={"settings": settings})
        return jsonify({"taskId": task.id})

    return jsonify({})


def accepting_new_tasks(celery_app):
    inspector = celery_app.control.inspect()
    nodes_stats = inspector.stats()
    nodes_reserved = inspector.reserved()

    workers = 0
    for stats in nodes_stats.values():
        workers += stats["pool"]["max-concurrency"]

    waiting_tasks = 0
    for reserved in nodes_reserved.values():
        waiting_tasks += len(reserved)

    return waiting_tasks < math.ceil(workers / 3)
python asynchronous flask celery amqp
1个回答
0
投票

最终,我通过查询RabbitMQ管理API解决了这个问题,正如https://stackoverflow.com/a/27074594/4183498指出的那样。

import math

from celery import Celery
from flask import abort, Flask, jsonify, request
from requests import get
from requests.auth import HTTPBasicAuth


flask_app = Flask(__name__)
celery_app = Celery("tasks", broker="rabbit...")


def get_workers_count():
    inspector = celery_app.control.inspect()
    nodes_stats = inspector.stats()
    nodes_reserved = inspector.reserved()

    workers = 0
    for stats in nodes_stats.values():
        workers += stats["pool"]["max-concurrency"]

    return workers


WORKERS_COUNT = get_workers_count()


@flask_app.route("/")
def home():
    async_ = request.args.get("async")
    settings = request.args.get("settings")

    if async_:
        if not accepting_new_tasks(celery_app):
            return abort(503)

        task = celery_app.send_task(name="my-task", kwargs={"settings": settings})
        return jsonify({"taskId": task.id})

    return jsonify({})


def accepting_new_tasks(celery_app):WORKERS_COUNT
    auth = HTTPBasicAuth("guest", "guest")
    response = get(
        "http://localhost:15672/api/queues/my_vhost/celery",
         auth=auth
    )
    waiting_tasks = response.json()["messages"]
    return waiting_tasks < math.ceil(WORKERS_COUNT / 3)
© www.soinside.com 2019 - 2024. All rights reserved.