从协程中提取函数和参数

问题描述 投票:0回答:3

python3.6中是否可以提取协程对象的函数和参数?

上下文:目前我有这样的东西:

async def func(*args):
    ...

ret = await remotely(func, x, y)

在引擎盖下,

remotely
pickles
func
x
y
,scp将其发送到不同的服务器,在那里它对它们进行unpickle,执行
func(x,y)
,pickles结果,scp将其返回,最后unpickle变成
ret

这个 API 让我觉得不舒服,我更喜欢:

ret = await remotely(func(x, y))

如果我可以腌制由

func(x, y)
表示的协程对象,我就可以做到这一点,但是当我尝试这样做时,我得到:

TypeError: can't pickle coroutine objects

所以我的另一个希望是我可以从

f
中提取
x
y
f(x, y)
,因此产生了这个问题。

python-asyncio python-3.6
3个回答
7
投票

因此,当您执行

ret = await remotely(func(x, y))
时,您实际上是为
func
构建了协程对象。幸运的是,您可以从协程对象中提取所需的信息,然后将其发送出去以进行远程执行。

首先,您可以使用

__qualname__
属性获取函数名称。这将为您提供完全限定的名称,即如果协程是嵌套的,它将为您提供函数的完整路径。

接下来,您可以从协程的框架对象中提取参数值。

这就是你的

remote
函数的样子

async def remote(cr):
    # Get the function name
    fname = cr.__qualname__

    # Get the argument values
    frame = cr.cr_frame
    args = frame.f_locals  # dict object

    result = await ...  # your scp stuff

    return result

只有一个警告。您应该指出该功能只能按照您发布的方式使用,即

ret = await remotely(func(x, y))

...换句话说,协程应该是“新鲜的”,而不是半途执行(如果您在将其传递给

remote
之前启动它,这几乎是不可能的)。否则,
f_locals
值可能包含在任何
await
之前定义的任何其他局部变量。


0
投票

没有找到干净的解决方案。但我们可以找到函数作为协程代码对象的引用:

import gc
import inspect

def get_function_from_coroutine(coroutine: Coroutine) -> Callable:
    referrers = gc.get_referrers(coroutine.cr_code)
    return next(filter(lambda ref: inspect.isfunction(ref), referrers))

def get_kwargs_from_coroutine(coroutine: Coroutine) -> dict[str, Any]:
    return coroutine.cr_frame.f_locals

async def foo(a: str, b: int):
    return a * b

coro = foo("test", b=2)
print(get_function_from_coroutine(coro))  # <function foo at 0x7ff61ece9820>
print(get_kwargs_from_coroutine(coro))  # {'a': 'test', 'b': 2}

0
投票

备注:

  • 该函数不进行酸洗,但可以对它的结果进行酸洗
  • 据我所知,此方法没有记录与协程一起使用,并且 mypy 抛出的打字错误表明它不应该工作。但它确实如此(并且通过
    cast
    ing 它确实通过了 mypy 检查)。

假设您使用的是 python 3.10(甚至早至 3.3——不过我还没有测试过),这应该会给您一个可以 pickle 的函数签名。如果您使用基元,也可以将其填充到 JSON 对象中。

import inspect
from collections.abc import Awaitable, Callable, Mapping, Sequence
from typing import cast

def autograph(f: Callable | Awaitable, a: Sequence[Any], k: Mapping[str, Any]) -> str, OrderedDict:
    sig = inspect.signature(cast(Callable, f)). # this appeases mypy but it is a lie
    bound_args = sig.bind(*a, **k)
    bound_args.apply_defaults()
    return f.__qualname__, bound_args.arguments

根据您的示例,您可以像这样创建函数调用

async def remotely(func, arguments, keyword_arguments):
    fname, parameters = autograph(f=func, a=arguments, k=keyword_arguments)
    # ... do any pickling/marshaling/etc
    await # ... some long io call
    return # ... whatever it is you return

可以这样使用(假设您用合法值替换

...
):

result = await remotely(func=..., arguments=..., keyword_arguments=...)
© www.soinside.com 2019 - 2024. All rights reserved.