我正在尝试编写一个带有上下文管理器的多线程助手。这个想法是在一个块内定义一堆函数,上下文管理器“神奇地”负责调度和一切。简化的工作版本如下所示:
import contextlib
@contextlib.contextmanager
def multi_threaded(count):
funcs = []
yield funcs
my_slice = int(count / len(funcs))
for i, func in enumerate(funcs):
start = my_slice * i
func(start, start + my_slice)
def spawn_many():
dataset = [1, 2, 3, 4, 5]
with multi_threaded(len(dataset)) as mt:
def foo(start_idx, end):
print("foo" + str(dataset[start_idx : end]))
def bar(start_idx, end):
print("bar" + str(dataset[start_idx : end]))
mt.append(foo)
mt.append(bar)
spawn_many()
这个例子有效,但我想去掉这些行:
mt.append(foo)
mt.append(bar)
这样用户只需要定义函数而不需要将它们添加到集合中。为什么?因为它不太容易出错,而且我无法控制用这个库编写的代码。
问题是,在yield之后,我超出了
def foo
发生的范围,所以我不知道该范围中存在的locals()
,这基本上就是我需要知道在那里定义了哪些函数。有什么想法/技巧/鼓励的话吗?
感谢您的阅读!
装饰器可能会好一点:
import contextlib
@contextlib.contextmanager
def multi_threaded(count):
funcs = []
yield funcs
my_slice = int(count / len(funcs))
for i, func in enumerate(funcs):
start = my_slice * i
func(start, start + my_slice)
def add_to_flist(mt):
def _add_to_flist(func):
mt.append(func)
return func
return _add_to_flist
def spawn_many():
dataset = [1, 2, 3, 4, 5]
with multi_threaded(len(dataset)) as mt:
@add_to_flist(mt)
def foo(start_idx, end):
print("foo" + str(dataset[start_idx : end]))
@add_to_flist(mt)
def bar(start_idx, end):
print("bar" + str(dataset[start_idx : end]))
spawn_many()
我读到这是不可能的,至少不是没有丑陋的黑客,但我认为我的解决方案最终并没有那么丑陋:
您在创建时将
locals()
字典传递到上下文管理器中,上下文管理器在生成后询问该字典,以收集任何可调用对象:
@contextlib.contextmanager
def multi_threaded(block_locals, count):
yield
funcs = [fn for fn in block_locals.values() if callable(fn)]
my_slice = int(count / len(funcs))
for i, func in enumerate(funcs):
start = my_slice * i
func(start, start + my_slice)
def spawn_many():
dataset = [1, 2, 3, 4, 5]
with multi_threaded(locals(), len(dataset)):
def foo(start_idx, end):
print("foo" + str(dataset[start_idx : end]))
def bar(start_idx, end):
print("bar" + str(dataset[start_idx : end]))
# Re-sync locals-dict handed earlier to multi_threaded().
locals()
spawn_many()
请注意,该技巧之所以有效,是因为块中最后一次调用了
locals()
。看来 Python 仅在调用 locals()
时才同步 <-->locals()
字典函数局部变量。如果没有最后一次通话,
multi_threaded
就会将 {'dataset': [1, 2, 3, 4, 5]}
视为当地人。
调用者的
locals()
字典可作为调用者框架的 f_locals
属性使用,您可以通过调用 sys._getframe(1)
来获取该属性,因此您可以保存调用者框架的 f_locals
的副本在上下文管理器的 __enter__
方法中,然后在 __exit__
方法中,与保存的副本相比,添加到调用者框架 f_locals
的键将是在 with
块中定义的函数:
import sys
class multi_threaded:
def __init__(self, count):
self.count = count
def __enter__(self):
self.old_locals = sys._getframe(1).f_locals.copy()
def __exit__(self, exc_type, exc_val, exc_tb):
new_locals = sys._getframe(1).f_locals
funcs = list(map(new_locals.get, new_locals.keys() - self.old_locals.keys()))
my_slice = int(self.count / len(funcs))
for i, func in enumerate(funcs):
start = my_slice * i
func(start, start + my_slice)
def spawn_many():
dataset = [1, 2, 3, 4, 5]
with multi_threaded(len(dataset)):
def foo(start_idx, end):
print("foo" + str(dataset[start_idx : end]))
def bar(start_idx, end):
print("bar" + str(dataset[start_idx : end]))
spawn_many()
输出:
foo[1, 2]
bar[3, 4]