在Python中使用asyncio时如何测量阻塞代码所花费的时间?

问题描述 投票:0回答:3

我目前正在迁移一些曾经阻塞的 Python 代码,以将

asyncio
async/await
一起使用。一次需要迁移很多代码,所以我更喜欢逐步迁移并有指标。考虑到这一点,我想创建一个装饰器来包装一些函数并知道它们阻塞事件循环多长时间。例如:

def measure_blocking_code(f):
    def wrapper(*args, **kwargs):
        # ?????
        # It should measure JUST 1 second
        # not 5 which is what the whole async function takes
    return wrapper

@measure_blocking_code
async def my_function():
    my_blocking_function()  # Takes 1 seconds
    await my_async_function()  # Takes 2 seconds
    await my_async_function_2()  # Takes 2 seconds

我知道事件循环有一个调试函数已经报告了这一点,但我需要获取特定函数的信息。

python performance asynchronous python-asyncio metrics
3个回答
4
投票

TLDR;

这个装饰器完成了这项工作:

def measure_blocking_code(f):
    async def wrapper(*args, **kwargs):
        t = 0
        coro = f()
        try:
            while True:
                t0 = time.perf_counter()
                future = coro.send(None)
                t1 = time.perf_counter()
                t += t1 - t0
                while not future.done():
                    await asyncio.sleep(0)
                future.result()  # raises exceptions if any
        except StopIteration as e:
            print(f'Function took {t:.2e} sec')
            return e.value
    return wrapper

说明

此解决方法利用了 cPython 中

asyncio
实现中使用的约定。这些约定是 PEP-492 的超集。换句话说:

  1. 您通常可以在不了解这些细节的情况下使用 async/await。
  2. 这可能不适用于其他异步库,例如 trio

asyncio
coro 对象(
coro
)可以通过调用
.send()
成员来执行。这只会运行阻塞代码,直到异步调用产生 Future 对象为止。仅通过测量
.send()
所花费的时间,就可以确定阻塞代码的持续时间。


0
投票

我终于找到方法了。我希望它对某人有帮助


import asyncio
import time


def measure(f):    
    async def wrapper(*args, **kwargs):
        coro_wrapper = f(*args, **kwargs).__await__()
        fut = asyncio.Future()

        total_time = 0
        def done(arg=None):
            try:
                nonlocal total_time
                start_time = time.perf_counter()
                next_fut = coro_wrapper.send(arg)
                end_time = time.perf_counter()
                total_time += end_time - start_time
                next_fut.add_done_callback(done)
            except StopIteration:
                fut.set_result(arg)
            except Exception as e:
                fut.set_exception(e)

        done()
        res = await fut
        print('Blocked for: ' + str(total_time) + ' seconds')
        return res

    return wrapper

0
投票

基于上面来自The SWE的代码,我做了一些改进:

def measure1(f):    
async def wrapper(*args, **kwargs):
    coroutine = f(*args, **kwargs).__await__()
    fut = asyncio.Future()
    s = time.perf_counter()

    def done(arg=None):
        try:
            next_ = coroutine.send(arg)
            next_.add_done_callback(done)
        except StopIteration as e:
            print(round(time.perf_counter() - s, 2))
            fut.set_result(e.value)
    done()

    return await fut
return wrapper
© www.soinside.com 2019 - 2024. All rights reserved.