我目前正在迁移一些曾经阻塞的 Python 代码,以将
asyncio
与 async/await
一起使用。一次需要迁移很多代码,所以我更喜欢逐步迁移并有指标。考虑到这一点,我想创建一个装饰器来包装一些函数并知道它们阻塞事件循环多长时间。例如:
def measure_blocking_code(f):
def wrapper(*args, **kwargs):
# ?????
# It should measure JUST 1 second
# not 5 which is what the whole async function takes
return wrapper
@measure_blocking_code
async def my_function():
my_blocking_function() # Takes 1 seconds
await my_async_function() # Takes 2 seconds
await my_async_function_2() # Takes 2 seconds
我知道事件循环有一个调试函数已经报告了这一点,但我需要获取特定函数的信息。
这个装饰器完成了这项工作:
def measure_blocking_code(f):
async def wrapper(*args, **kwargs):
t = 0
coro = f()
try:
while True:
t0 = time.perf_counter()
future = coro.send(None)
t1 = time.perf_counter()
t += t1 - t0
while not future.done():
await asyncio.sleep(0)
future.result() # raises exceptions if any
except StopIteration as e:
print(f'Function took {t:.2e} sec')
return e.value
return wrapper
此解决方法利用了 cPython 中
asyncio
实现中使用的约定。这些约定是 PEP-492 的超集。换句话说:
asyncio
coro 对象(coro
)可以通过调用 .send()
成员来执行。这只会运行阻塞代码,直到异步调用产生 Future 对象为止。仅通过测量.send()
所花费的时间,就可以确定阻塞代码的持续时间。
我终于找到方法了。我希望它对某人有帮助
import asyncio
import time
def measure(f):
async def wrapper(*args, **kwargs):
coro_wrapper = f(*args, **kwargs).__await__()
fut = asyncio.Future()
total_time = 0
def done(arg=None):
try:
nonlocal total_time
start_time = time.perf_counter()
next_fut = coro_wrapper.send(arg)
end_time = time.perf_counter()
total_time += end_time - start_time
next_fut.add_done_callback(done)
except StopIteration:
fut.set_result(arg)
except Exception as e:
fut.set_exception(e)
done()
res = await fut
print('Blocked for: ' + str(total_time) + ' seconds')
return res
return wrapper
基于上面来自The SWE的代码,我做了一些改进:
def measure1(f):
async def wrapper(*args, **kwargs):
coroutine = f(*args, **kwargs).__await__()
fut = asyncio.Future()
s = time.perf_counter()
def done(arg=None):
try:
next_ = coroutine.send(arg)
next_.add_done_callback(done)
except StopIteration as e:
print(round(time.perf_counter() - s, 2))
fut.set_result(e.value)
done()
return await fut
return wrapper