通过回调将函数转换为Python生成器?

问题描述 投票:28回答:5

Scipy最小化函数(仅用作示例),可以选择在每一步添加回调函数。所以我可以做点什么,

def my_callback(x):
    print x
scipy.optimize.fmin(func, x0, callback=my_callback)

有没有办法使用回调函数来创建fmin的生成器版本,这样我才能做到,

for x in my_fmin(func,x0):
    print x

似乎可能有一些产量和发送的组合,但我可以想到任何事情。

python generator coroutine
5个回答
15
投票

正如评论中所指出的,您可以使用Queue在新线程中完成。缺点是你仍然需要一些方法来访问最终结果(fmin最后返回的内容)。我下面的例子使用一个可选的回调来做一些事情(另一种选择就是产生它,尽管你的调用代码必须区分迭代结果和最终结果):

from thread import start_new_thread
from Queue import Queue

def my_fmin(func, x0, end_callback=(lambda x:x), timeout=None):

    q = Queue() # fmin produces, the generator consumes
    job_done = object() # signals the processing is done

    # Producer
    def my_callback(x):
        q.put(x)
    def task():
        ret = scipy.optimize.fmin(func,x0,callback=my_callback)
        q.put(job_done)
        end_callback(ret) # "Returns" the result of the main call

    # Starts fmin in a new thread
    start_new_thread(task,())

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        if next_item is job_done:
            break
        yield next_item

更新:阻止执行下一次迭代,直到消费者完成处理最后一次迭代,还需要使用task_donejoin

    # Producer
    def my_callback(x):
        q.put(x)
        q.join() # Blocks until task_done is called

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        if next_item is job_done:
            break
        yield next_item
        q.task_done() # Unblocks the producer, so a new iteration can start

请注意,maxsize=1不是必需的,因为在消耗最后一个项目之前,不会将新项目添加到队列中。

更新2:另请注意,除非此生成器最终检索到所有项目,否则创建的线程将死锁(它将永久阻塞并且其资源永远不会被释放)。生产者正在等待队列,并且由于它存储了对该队列的引用,即使消费者是gc,它也永远不会被gc回收。然后队列将无法访问,因此没有人能够释放锁。

如果可能的话,一个干净的解决方案是未知的(因为它取决于fmin所用的特定功能)。可以使用timeout进行解决方法,如果put阻塞太久,生产者会引发异常:

    q = Queue(maxsize=1)

    # Producer
    def my_callback(x):
        q.put(x)
        q.put("dummy",True,timeout) # Blocks until the first result is retrieved
        q.join() # Blocks again until task_done is called

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        q.task_done()                   # (one "task_done" per "get")
        if next_item is job_done:
            break
        yield next_item
        q.get() # Retrieves the "dummy" object (must be after yield)
        q.task_done() # Unblocks the producer, so a new iteration can start

6
投票

概念使用maxsize=1和生产者/消费者模型的阻塞队列。

回调产生,然后对回调的下一次调用将阻塞整个队列。

然后,使用者从队列中获取值,尝试获取另一个值,并在读取时阻塞。

生产者被允许推入队列,冲洗并重复。

用法:

def dummy(func, arg, callback=None):
  for i in range(100):
    callback(func(arg+i))

# Dummy example:
for i in Iteratorize(dummy, lambda x: x+1, 0):
  print(i)

# example with scipy:
for i in Iteratorize(scipy.optimize.fmin, func, x0):
   print(i)

可以按预期用于迭代器:

for i in take(5, Iteratorize(dummy, lambda x: x+1, 0)):
  print(i)

迭代课:

from thread import start_new_thread
from Queue import Queue

class Iteratorize:
  """ 
  Transforms a function that takes a callback 
  into a lazy iterator (generator).
  """
  def __init__(self, func, ifunc, arg, callback=None):
    self.mfunc=func
    self.ifunc=ifunc
    self.c_callback=callback
    self.q = Queue(maxsize=1)
    self.stored_arg=arg
    self.sentinel = object()

    def _callback(val):
      self.q.put(val)

    def gentask():
      ret = self.mfunc(self.ifunc, self.stored_arg, callback=_callback)
      self.q.put(self.sentinel)
      if self.c_callback:
        self.c_callback(ret)

    start_new_thread(gentask, ())

  def __iter__(self):
    return self

  def next(self):
    obj = self.q.get(True,None)
    if obj is self.sentinel:
     raise StopIteration 
    else:
      return obj

可能会做一些清理接受*args**kwargs的功能被包装和/或最终结果回调。


6
投票

Generator as coroutine (no threading)

FakeFtp使用retrbinary函数,使用每次成功读取数据块时调用回调:

class FakeFtp(object):
    def __init__(self):
        self.data = iter(["aaa", "bbb", "ccc", "ddd"])

    def login(self, user, password):
        self.user = user
        self.password = password

    def retrbinary(self, cmd, cb):
        for chunk in self.data:
            cb(chunk)

使用简单的回调函数有一个缺点,即重复调用它,并且回调函数不能轻易地保持调用之间的上下文。

下面的代码定义了process_chunks生成器,它将能够逐个接收数据块并进行处理。与简单的回调相比,这里我们能够将所有处理保持在一个函数中而不会丢失上下文。

from contextlib import closing
from itertools import count


def main():
    processed = []

    def process_chunks():
        for i in count():
            try:
                # (repeatedly) get the chunk to process
                chunk = yield
            except GeneratorExit:
                # finish_up
                print("Finishing up.")
                return
            else:
                # Here process the chunk as you like
                print("inside coroutine, processing chunk:", i, chunk)
                product = "processed({i}): {chunk}".format(i=i, chunk=chunk)
                processed.append(product)

    with closing(process_chunks()) as coroutine:
        # Get the coroutine to the first yield
        coroutine.next()
        ftp = FakeFtp()
        # next line repeatedly calls `coroutine.send(data)`
        ftp.retrbinary("RETR binary", cb=coroutine.send)
        # each callback "jumps" to `yield` line in `process_chunks`

    print("processed result", processed)
    print("DONE")

要查看代码的实际效果,请输入FakeFtp类,上面显示的代码和以下行:

main()

到一个文件并调用它:

$ python headsandtails.py
('inside coroutine, processing chunk:', 0, 'aaa')
('inside coroutine, processing chunk:', 1, 'bbb')
('inside coroutine, processing chunk:', 2, 'ccc')
('inside coroutine, processing chunk:', 3, 'ddd')
Finishing up.
('processed result', ['processed(0): aaa', 'processed(1): bbb', 'processed(2): ccc', 'processed(3): ddd'])
DONE

这个怎么运作

processed = []就在这里表示,发电机process_chunks与其外部环境合作没有任何问题。所有都包含在def main():中证明,没有必要使用全局变量。

def process_chunks()是解决方案的核心。它可能有一个输入参数(这里没有使用),但主要点,它接收输入是每个yield线返回任何人通过.send(data)发送到这个生成器的实例。一个人可以coroutine.send(chunk),但在这个例子中,它是通过回调参考这个函数callback.send完成的。

请注意,在实际解决方案中,在代码中有多个yields是没有问题的,它们是逐个处理的。这可以用于例如读取(并忽略)CSV文件的标题,然后继续处理包含数据的记录。

我们可以实例化并使用生成器,如下所示:

coroutine = process_chunks()
# Get the coroutine to the first yield
coroutine.next()

ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`

# close the coroutine (will throw the `GeneratorExit` exception into the
# `process_chunks` coroutine).
coroutine.close()

真正的代码使用contextlib closing上下文管理器来确保,总是调用coroutine.close()

Conclusions

这个解决方案没有提供一种迭代器来消费传统风格的“来自外部”的数据。另一方面,我们能够:

  • 使用发电机“从里面”
  • 将所有迭代处理保持在一个函数内,而不会在回调之间中断
  • 可选择使用外部上下文
  • 向外界提供有用的结果
  • 所有这些都可以在不使用线程的情况下完成

致谢:该解决方案深受用户响应Python FTP “chunk” iterator (without loading entire file into memory) 的启发,由user2357112编写


1
投票

A more pythonic way

使用threadingqueue的解决方案很好但不是pythonic,这是使用subprocess的更好的方法(至少对我来说〜):

import pickle
import scipy
import select
import subprocess

def my_fmin(func, x0):
    # open a process to use as a pipeline
    proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    def my_callback(x):
        # x might be any object, not only str, so we use pickle to dump it
        proc.stdin.write(pickle.dumps(x) + '\n')

    scipy.optimize.fmin(func, x0, callback=my_callback)

    # use select in case that the callback is asynchronous;
    # otherwise, you can simply close proc.stdin and iterate over proc.stdout
    while select.select([proc.stdout], [], [], 0)[0]:
        yield pickle.loads(proc.stdout.readline()[:-1])

    # close the process
    proc.communicate()

然后你可以使用这样的函数:

for x in my_fmin(func, x0):
    print x

0
投票

怎么样

data = []
scipy.optimize.fmin(func,x0,callback=data.append)
for line in data:
    print line

如果没有,您想要对发生器的数据做什么?

© www.soinside.com 2019 - 2024. All rights reserved.