appengine NDB 异步操作事件循环如何工作?

问题描述 投票:0回答:2

我在 App Engine 标准环境上使用 python 运行时。

我在代码中使用一些异步操作,引用教程:

  • 您可以通过执行以下操作来加快您的应用程序速度 数据存储操作与其他操作并行,或者执行一些操作 数据存储操作彼此并行。

  • Tasklet 是一种无需线程即可编写并发函数的方法; tasklet 由事件循环执行

https://cloud.google.com/appengine/docs/python/ndb/async

这都是关于并行执行的,但我不明白这种并行性在同步Python代码中是如何工作的。

事件循环如何工作?它是否在单独的进程/线程中运行? 它执行任何真正并发的执行吗?或者这只是创建批处理 RPC 的便捷方法,它在第一个

get_result()
调用上同步运行?

https://cloud.google.com/appengine/docs/python/refdocs/modules/google/appengine/ext/ndb/eventloop

python multithreading google-app-engine asynchronous app-engine-ndb
2个回答
1
投票

正如您怀疑的那样,异步操作只是在阻止第一个(以及所有后续的每个)

get_result()
调用之前创建和启动多个 RPC(可能会也可能不会批处理,这不是强制性的)的一种方法。这适用于大多数/所有基于 RPC 的 GAE 基础设施(ndb、memcache、urlfetch 等)

如果您查看大多数调用(支持异步版本)的非异步版本的实现,您会发现它们实际上只是紧随其后的异步版本调用,然后是

get_result()
调用,例如:

  @classmethod
  def _get_or_insert(*args, **kwds):
    """Transactionally retrieves an existing entity or creates a new one.    
    (snip)          
    """
    cls, args = args[0], args[1:]
    return cls._get_or_insert_async(*args, **kwds).get_result()
  get_or_insert = _get_or_insert

至于运行事件循环本身 - 看起来它是在您的应用程序通过

ndb
:
 进行第一个 
get_event_loop()

RPC 调用(请求/线程)时透明启动的
def get_event_loop():
  """Return a EventLoop instance.

  A new instance is created for each new HTTP request.  We determine
  that we're in a new request by inspecting os.environ, which is reset
  at the start of each request.  Also, each thread gets its own loop.
  """
  ev = _state.event_loop
  if not os.getenv(_EVENT_LOOP_KEY) and ev is not None:
    ev.clear()
    _state.event_loop = None
    ev = None
  if ev is None:
    ev = EventLoop()
    _state.event_loop = ev
    os.environ[_EVENT_LOOP_KEY] = '1'
  return ev

0
投票

Google appengine 提供了一种通过使用 @ndb.tasklet 包装器来运行异步方法的方法。

任何用 @ndb.tasklet 包装的 python 方法,在执行时都会返回一个 future 对象。

from google.appengine.ext import ndb

@ndb.tasklet
def sum_async(a, b):
    raise ndb.Return(a+b)

result = sum_async(1,2) # "result" will contain future object instead of original answer 3 

当调用

sum_async()
方法时,它将返回一个
ndb.future
对象。要从此方法获得实际结果,我们可以使用
sum_async(1,2).get_result()

但是如果函数是以下形式的生成器:

from google.appengine.ext import ndb

@ndb.tasklet
def generic_sum_async(*agrs):
    result= sum(*args)
    raise ndb.Return(result)

@ndb.tasklet
def sum_async_generator(a,b):
    result = yield generic_sum(a,b)
    raise ndb.Return(result)

sum_result = sum_async_generator(a,b) # Will return a future object which is inserted into eventloop. 

当调用

sum_async_generator()
方法时,它会像前面的示例一样返回一个 future 对象,但是这次
sum_async_generator()
是一个 生成器函数。如果异步方法是生成器,它将将该方法的未来对象插入到
event_loop
中(在 eventloop 类中,这称为
current
队列)。当在此 future 对象上调用
.get_result()
方法时,事件循环将开始执行其中的 future,完成后它将为此父 future 对象设置结果。

如果异步方法仅包含计算操作,则执行时间将类似于在 python 方法中同步执行相同操作。但当我们在异步方法内进行 RPC 调用时,主要优势就出现了。

对于以下伪代码

@ndb.tasklet
def rpc_async_method():
    a = yield method1()
    rpc_result = yield rpc_call_to_read_from_database()
    b = yield method2()
    raise ndb.Return(a,b,rpc_result)

如果上述函数不是异步的,那么程序执行将在

rpc_call_to_read_from_database()
方法处等待,直到完成。之后执行将移至
method2()
。 但对于异步情况,eventloop 会识别 rpc 相关方法并将其放入单独的队列中(在 eventloop 类中称为
rpc = {}
,后跟内部 eventloop 逻辑),以便其执行不会阻塞下面执行的其他异步方法它。

ndb.tasklet
还有一个
multiFuture
,通过它我们可以并行执行多个异步方法,如下所示:

futs = [async_method_1(), async_method_2(), async_method_3()]
yield futs

通过这种方式,google appengine 重写了

yield
raise
运算符来执行 python 中的异步操作。

为了检查如何在事件循环中执行任何异步方法,我在此库中提供了一个装饰器:https://github.com/Manas-Chinta/google_appengine_extensions

这个

@ELTracer()
装饰器将观察事件循环,并在任何 future(生成器或 rpc 相关的 future)添加到 eventloop 或任何 future 正在从 eventloop 执行时打印。

© www.soinside.com 2019 - 2024. All rights reserved.