芹菜和烧瓶,同一芹菜应用程序实例

问题描述 投票:1回答:1

我正在尝试通过几个烧瓶端点从芹菜节拍中动态添加和删除任务。我创建了一个名为myApp的简单项目,并创建了一个名为flaskr的程序包(是的,类似于本教程),其中包含三个文件。

myApp
    flaskr
        __init__.py
        routes.py
        tasks.py
    wsgi.py

这是端点代码

@route_blueprint.route('/myApp/add_task')
def add():
    print(celery.conf.beat_schedule)
    print(hex(id(celery)))
    celery.add_periodic_task(10.0, tasks.add.s(55, 2), name='add every 10')
    print(celery.conf.beat_schedule)
    return ""

我进入PyCharm控制台,从其中一个控制台运行像这样的gunicorn:

gunicorn wsgi:app -b localhost:8000

从另一个控制台选项卡,我也像这样运行Celery

celery-flaskr.celery worker --loglevel = info

并且从另一个我像这样跳动

celery -A flaskr.celery beat -l = debug

当我击中端点时,在控制台中我可以看到正在添加的任务,但beat从未发送过。

我怀疑flask正在设置任务的是另一个celery_app实例,所以我打印了我尝试修改的celery对象的照片,是的,它是一个不同的对象。

这是从芹菜开始的

flaskr:0x110048978

 -------------- [email protected] v4.3.0 (rhubarb)
---- **** ----- 
--- * ***  * -- Darwin-18.6.0-x86_64-i386-64bit 2019-08-26 17:19:47
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         flaskr:0x110048978
- ** ---------- .> transport:   redis://localhost:6379/2
- ** ---------- .> results:     redis://localhost:6379/2
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

这是来自端点的

0x101e31e80

问题

我对python相当陌生,但是我想这很有意义,因为我从两个不同的过程中触发了相同的代码,一个是芹菜工人触发的,另一个是烧瓶/独角兽触发的,所以他们再也看不到每个其他。

是否有办法让flask访问从celery命令行实例初始化的实例,或者我应该从flask内部启动工作进程? (我在芹菜和烧瓶的任何文件中都没有看到它)


这是完整代码

__ init __。py

from flask import Flask
from celery import Celery
import config

celery = Celery(__name__,
                backend=config.CELERY_BACKEND,
                broker=config.CELERY_BROKER,
                include=['flaskr.tasks'])


@celery.task
def asd(x, y):
    print('ADD')
    # raise exceptions.Retry(20)
    return x + y


def create_app(test_config=None):
    # create and configure the app
    app = Flask(__name__)

    from .routes import route_blueprint
    app.register_blueprint(route_blueprint)

    return app

tasks.py

from __future__ import absolute_import, unicode_literals
from . import celery
import logging.config


logging.config.fileConfig('logging.conf')
logger = logging.getLogger('myApp')


@celery.task
def add(x, y):
    print('ADD')
    # raise exceptions.Retry(20)
    return x + y


@celery.task(bind=True)
def see_you(self, x, y):
    logger.info('Log de see_you')
    print(x)
    # print("See you in ten seconds!")


print('Initializing from tasks')
print(hex(id(celery)))
print('beat schedule: ' + str(celery.conf.beat_schedule))
# celery.add_periodic_task(10.0, add.s(1, 2), name='add every 10')
# print(str(celery.conf.beat_schedule))

routes.py

from flask import Blueprint
import logging.config
from . import tasks
from . import celery


route_blueprint = Blueprint('route_blueprint', __name__,)

logging.config.fileConfig('logging.conf')
logger = logging.getLogger('myApp')


@route_blueprint.route('/myApp/health')
def health():
    return "Health ok"


@route_blueprint.route('/myApp/add_task')
def add():
    print(celery.conf.beat_schedule)
    # tasks.add.delay(55, 2)
    print(hex(id(celery)))
    celery.add_periodic_task(10.0, tasks.add.s(55, 2), name='add every 10')
    print(celery.conf.beat_schedule)
    return "okkk"
    

我正在尝试通过几个烧瓶端点从芹菜节拍中动态添加和删除任务。我创建了一个名为myApp的简单项目,并创建了一个名为flaskr的软件包(是的,类似于本教程),其中包含三个...

python flask celery celerybeat
1个回答
0
投票

你有运气吗?我陷入了类似的困境。

© www.soinside.com 2019 - 2024. All rights reserved.