我在 App Engine 标准环境上使用 python 运行时。
我在代码中使用一些异步操作,引用教程:
您可以通过执行以下操作来加快您的应用程序速度 数据存储操作与其他操作并行,或者执行一些操作 数据存储操作彼此并行。
Tasklet 是一种无需线程即可编写并发函数的方法; tasklet 由事件循环执行
https://cloud.google.com/appengine/docs/python/ndb/async
这都是关于并行执行的,但我不明白这种并行性在同步Python代码中是如何工作的。
事件循环如何工作?它是否在单独的进程/线程中运行? 它执行任何真正并发的执行吗?或者这只是创建批处理 RPC 的便捷方法,它在第一个
get_result()
调用上同步运行?
https://cloud.google.com/appengine/docs/python/refdocs/modules/google/appengine/ext/ndb/eventloop
正如您怀疑的那样,异步操作只是在阻止第一个(以及所有后续的每个)
get_result()
调用之前创建和启动多个 RPC(可能会也可能不会批处理,这不是强制性的)的一种方法。这适用于大多数/所有基于 RPC 的 GAE 基础设施(ndb、memcache、urlfetch 等)
如果您查看大多数调用(支持异步版本)的非异步版本的实现,您会发现它们实际上只是紧随其后的异步版本调用,然后是
get_result()
调用,例如:
@classmethod
def _get_or_insert(*args, **kwds):
"""Transactionally retrieves an existing entity or creates a new one.
(snip)
"""
cls, args = args[0], args[1:]
return cls._get_or_insert_async(*args, **kwds).get_result()
get_or_insert = _get_or_insert
至于运行事件循环本身 - 看起来它是在您的应用程序通过
ndb
:进行第一个
get_event_loop()
RPC 调用(请求/线程)时透明启动的
def get_event_loop():
"""Return a EventLoop instance.
A new instance is created for each new HTTP request. We determine
that we're in a new request by inspecting os.environ, which is reset
at the start of each request. Also, each thread gets its own loop.
"""
ev = _state.event_loop
if not os.getenv(_EVENT_LOOP_KEY) and ev is not None:
ev.clear()
_state.event_loop = None
ev = None
if ev is None:
ev = EventLoop()
_state.event_loop = ev
os.environ[_EVENT_LOOP_KEY] = '1'
return ev
Google appengine 提供了一种通过使用 @ndb.tasklet 包装器来运行异步方法的方法。
任何用 @ndb.tasklet 包装的 python 方法,在执行时都会返回一个 future 对象。
from google.appengine.ext import ndb
@ndb.tasklet
def sum_async(a, b):
raise ndb.Return(a+b)
result = sum_async(1,2) # "result" will contain future object instead of original answer 3
当调用
sum_async()
方法时,它将返回一个 ndb.future
对象。要从此方法获得实际结果,我们可以使用 sum_async(1,2).get_result()
。
但是如果函数是以下形式的生成器:
from google.appengine.ext import ndb
@ndb.tasklet
def generic_sum_async(*agrs):
result= sum(*args)
raise ndb.Return(result)
@ndb.tasklet
def sum_async_generator(a,b):
result = yield generic_sum(a,b)
raise ndb.Return(result)
sum_result = sum_async_generator(a,b) # Will return a future object which is inserted into eventloop.
当调用
sum_async_generator()
方法时,它会像前面的示例一样返回一个 future 对象,但是这次 sum_async_generator()
是一个 生成器函数。如果异步方法是生成器,它将将该方法的未来对象插入到 event_loop
中(在 eventloop 类中,这称为 current
队列)。当在此 future 对象上调用 .get_result()
方法时,事件循环将开始执行其中的 future,完成后它将为此父 future 对象设置结果。
如果异步方法仅包含计算操作,则执行时间将类似于在 python 方法中同步执行相同操作。但当我们在异步方法内进行 RPC 调用时,主要优势就出现了。
对于以下伪代码
@ndb.tasklet
def rpc_async_method():
a = yield method1()
rpc_result = yield rpc_call_to_read_from_database()
b = yield method2()
raise ndb.Return(a,b,rpc_result)
如果上述函数不是异步的,那么程序执行将在
rpc_call_to_read_from_database()
方法处等待,直到完成。之后执行将移至method2()
。
但对于异步情况,eventloop 会识别 rpc 相关方法并将其放入单独的队列中(在 eventloop 类中称为 rpc = {}
,后跟内部 eventloop 逻辑),以便其执行不会阻塞下面执行的其他异步方法它。
ndb.tasklet
还有一个multiFuture
,通过它我们可以并行执行多个异步方法,如下所示:
futs = [async_method_1(), async_method_2(), async_method_3()]
yield futs
通过这种方式,google appengine 重写了
yield
和 raise
运算符来执行 python 中的异步操作。
为了检查如何在事件循环中执行任何异步方法,我在此库中提供了一个装饰器:https://github.com/Manas-Chinta/google_appengine_extensions。
这个
@ELTracer()
装饰器将观察事件循环,并在任何 future(生成器或 rpc 相关的 future)添加到 eventloop 或任何 future 正在从 eventloop 执行时打印。