我正在尝试通过几个烧瓶端点从芹菜节拍中动态添加和删除任务。我创建了一个名为myApp的简单项目,并创建了一个名为flaskr的程序包(是的,类似于本教程),其中包含三个文件。
myApp flaskr __init__.py routes.py tasks.py wsgi.py
这是端点代码
@route_blueprint.route('/myApp/add_task') def add(): print(celery.conf.beat_schedule) print(hex(id(celery))) celery.add_periodic_task(10.0, tasks.add.s(55, 2), name='add every 10') print(celery.conf.beat_schedule) return ""
我进入PyCharm控制台,从其中一个控制台运行像这样的gunicorn:
gunicorn wsgi:app -b localhost:8000
从另一个控制台选项卡,我也像这样运行Celery
celery-flaskr.celery worker --loglevel = info
并且从另一个我像这样跳动
celery -A flaskr.celery beat -l = debug
当我击中端点时,在控制台中我可以看到正在添加的任务,但beat从未发送过。
我怀疑flask正在设置任务的是另一个celery_app实例,所以我打印了我尝试修改的celery对象的照片,是的,它是一个不同的对象。
flaskr:0x110048978
-------------- [email protected] v4.3.0 (rhubarb) ---- **** ----- --- * *** * -- Darwin-18.6.0-x86_64-i386-64bit 2019-08-26 17:19:47 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: flaskr:0x110048978 - ** ---------- .> transport: redis://localhost:6379/2 - ** ---------- .> results: redis://localhost:6379/2 - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery
这是来自端点的
0x101e31e80
我对python相当陌生,但是我想这很有意义,因为我从两个不同的过程中触发了相同的代码,一个是芹菜工人触发的,另一个是烧瓶/独角兽触发的,所以他们再也看不到每个其他。
是否有办法让flask访问从celery命令行实例初始化的实例,或者我应该从flask内部启动工作进程? (我在芹菜和烧瓶的任何文件中都没有看到它)
__ init __。py
from flask import Flask from celery import Celery import config celery = Celery(__name__, backend=config.CELERY_BACKEND, broker=config.CELERY_BROKER, include=['flaskr.tasks']) @celery.task def asd(x, y): print('ADD') # raise exceptions.Retry(20) return x + y def create_app(test_config=None): # create and configure the app app = Flask(__name__) from .routes import route_blueprint app.register_blueprint(route_blueprint) return app
tasks.py
from __future__ import absolute_import, unicode_literals from . import celery import logging.config logging.config.fileConfig('logging.conf') logger = logging.getLogger('myApp') @celery.task def add(x, y): print('ADD') # raise exceptions.Retry(20) return x + y @celery.task(bind=True) def see_you(self, x, y): logger.info('Log de see_you') print(x) # print("See you in ten seconds!") print('Initializing from tasks') print(hex(id(celery))) print('beat schedule: ' + str(celery.conf.beat_schedule)) # celery.add_periodic_task(10.0, add.s(1, 2), name='add every 10') # print(str(celery.conf.beat_schedule))
routes.py
from flask import Blueprint
import logging.config
from . import tasks
from . import celery
route_blueprint = Blueprint('route_blueprint', __name__,)
logging.config.fileConfig('logging.conf')
logger = logging.getLogger('myApp')
@route_blueprint.route('/myApp/health')
def health():
return "Health ok"
@route_blueprint.route('/myApp/add_task')
def add():
print(celery.conf.beat_schedule)
# tasks.add.delay(55, 2)
print(hex(id(celery)))
celery.add_periodic_task(10.0, tasks.add.s(55, 2), name='add every 10')
print(celery.conf.beat_schedule)
return "okkk"
我正在尝试通过几个烧瓶端点从芹菜节拍中动态添加和删除任务。我创建了一个名为myApp的简单项目,并创建了一个名为flaskr的软件包(是的,类似于本教程),其中包含三个...
你有运气吗?我陷入了类似的困境。