我有一些代码,需要执行瓶返回响应后。我不认为这是很复杂,无法建立像芹菜任务队列它。关键的要求是,必须瓶运行此功能之前返回到客户端的响应。它不能等待函数来执行。
大约有一些这方面存在的问题,但没有一个答案似乎解决运行任务的响应发送到客户端后,他们仍然执行同步,然后返回响应。
长话短说是瓶不提供任何的特殊能力做到这一点。对于简单的一次性的任务,考虑Python的多线程,如下图所示。对于更复杂的配置,使用像RQ或芹菜任务队列。
了解瓶和提供的功能,为什么他们没有完成预期目标,这一点很重要。所有这些都是在其他情况下有用,是良好的阅读,但不与后台任务帮助。
after_request
handler瓶的after_request
处理程序,如this pattern for deferred request callbacks和this snippet on attaching different functions per request详细,将请求传递给回调函数。预期用途的情况下是修改请求,如附加的cookie。
因此,请求将等着为这些处理执行完毕,因为期望是请求本身会发生变化。
teardown_request
handler这类似于after_request
,但teardown_request
没有收到request
对象。因此,这意味着它不会等待请求,对不对?
这似乎是一个解决方案,因为this answer to a similar Stack Overflow question建议。而且,由于瓶的文件解释说,teardown callbacks are independent of the actual request并没有收到请求上下文,你有充分的理由相信这一点。
不幸的是,teardown_request
仍然是同步的,它只是发生在合剂的请求处理以后的一部分,当请求不再修改。瓶仍将等待拆卸功能返回响应之前完成,为this list of Flask callbacks and errors使然。
烧瓶可以通过使发电机以Response()
流响应,this Stack Overflow answer to a similar question建议。
随着流媒体,客户机开始接收响应请求结束之前。但是,请求仍然运行同步,所以直到流结束,工人在处理请求忙。
This Flask pattern for streaming包括有关使用stream_with_context()
,这是必要的包括请求上下文一些文档。
瓶不提供解决方案来运行功能,在后台,因为这不是烧瓶的责任。
在大多数情况下,解决这个问题的最好方法是使用一个任务队列,如RQ或芹菜。这些管理像配置,调度技巧性的东西,并为you.This分配工人是最常见的回答这样的问题,因为这是最正确的,并迫使你在你考虑上下文的方式等设置的东西,正确。
如果您需要在后台运行的功能,不希望建立一个队列来管理这一点,你可以使用Python内置的threading
或multiprocessing
产卵后台工作。
您不能访问request
或他人的后台任务瓶的螺纹当地人,因为该请求将不活动出现。相反,通过您从视图中,当您创建它的后台线程需要的数据。
@app.route('/start_task')
def start_task():
def do_work(value):
# do something that takes a long time
import time
time.sleep(value)
thread = Thread(target=do_work, kwargs={'value': request.args.get('value', 20))
thread.start()
return 'started'
瓶是WSGI app,因此它不能从根本上处理响应后,任何东西。这就是为什么没有这样的处理程序存在,WSGI应用程序本身仅用于构建响应iterator对象的WSGI服务器负责。
一个WSGI server但是(如gunicorn)可以很容易提供这种功能,但捆绑应用程序服务器是有很多原因一个非常糟糕的主意。
对于这个确切原因,WSGI提供Middleware一个规范,并WERKZEUG提供了许多帮手来简化常见的中间件功能。其中是ClosingIterator类,它允许你钩子方法直到请求被关闭之后,执行响应迭代器的close
方法。
这里做一个烧瓶延伸天真after_response
实现的例子:
import traceback
from werkzeug.wsgi import ClosingIterator
class AfterResponse:
def __init__(self, app=None):
self.callbacks = []
if app:
self.init_app(app)
def __call__(self, callback):
self.callbacks.append(callback)
return callback
def init_app(self, app):
# install extension
app.after_response = self
# install middleware
app.wsgi_app = AfterResponseMiddleware(app.wsgi_app, self)
def flush(self):
for fn in self.callbacks:
try:
fn()
except Exception:
traceback.print_exc()
class AfterResponseMiddleware:
def __init__(self, application, after_response_ext):
self.application = application
self.after_response_ext = after_response_ext
def __call__(self, environ, after_response):
iterator = self.application(environ, after_response)
try:
return ClosingIterator(iterator, [self.after_response_ext.flush])
except Exception:
traceback.print_exc()
return iterator
您可以使用该扩展是这样的:
import flask
app = flask.Flask("after_response")
AfterResponse(app)
@app.after_response
def say_hi():
print("hi")
@app.route("/")
def home():
return "Success!\n"
当你卷曲“/”你会看到在你的日志中的以下内容:
127.0.0.1 - - [24/Jun/2018 19:30:48] "GET / HTTP/1.1" 200 -
hi
这解决了问题,根本不引入任何线程(GIL ??),或具有安装和管理任务队列和客户端软件。
您可以使用此代码我试图it.It作品。
这段代码将打印字符串“消息”。后3秒,从调度时间。你可以根据你的要求改变的时候你的自我。
import time, traceback
import threading
def every(delay,message, task):
next_time = time.time() + delay
time.sleep(max(0, next_time - time.time()))
task(message)
def foo(message):
print(message+" :foo", time.time())
def main(message):
threading.Thread(target=lambda: every(3,message, foo)).start()
main("message")
中间件解决方案瓶蓝图
这是马修的故事(这是一个完美的解决方案恕我直言 - 感谢马修)提出了同样的解决方案,适用于瓶蓝图。这里的秘诀是让使用CURRENT_APP代理应用程序上下文的保持。阅读起来here了解更多信息(http://flask.pocoo.org/docs/1.0/appcontext/)
让我们假设AfterThisResponse&AfterThisResponseMiddleware类被放置在一个模块中的.utils.after_this_response.py
然后在瓶创建对象时,你可能有,如...
__init__.朋友
from api.routes import my_blueprint
from .utils.after_this_response import AfterThisResponse
app = Flask( __name__ )
AfterThisResponse( app )
app.register_blueprint( my_blueprint.mod )
然后你的蓝图模块中...
啊_blueprint.朋友
from flask import Blueprint, current_app
mod = Blueprint( 'a_blueprint', __name__, url_prefix=URL_PREFIX )
@mod.route( "/some_resource", methods=['GET', 'POST'] )
def some_resource():
# do some stuff here if you want
@current_app.after_this_response
def post_process():
# this will occur after you finish processing the route & return (below):
time.sleep(2)
print("after_response")
# do more stuff here if you like & then return like so:
return "Success!\n"