Scipy最小化函数(仅用作示例),可以选择在每一步添加回调函数。所以我可以做点什么,
def my_callback(x):
print x
scipy.optimize.fmin(func, x0, callback=my_callback)
有没有办法使用回调函数来创建fmin的生成器版本,这样我才能做到,
for x in my_fmin(func,x0):
print x
似乎可能有一些产量和发送的组合,但我可以想到任何事情。
正如评论中所指出的,您可以使用Queue
在新线程中完成。缺点是你仍然需要一些方法来访问最终结果(fmin
最后返回的内容)。我下面的例子使用一个可选的回调来做一些事情(另一种选择就是产生它,尽管你的调用代码必须区分迭代结果和最终结果):
from thread import start_new_thread
from Queue import Queue
def my_fmin(func, x0, end_callback=(lambda x:x), timeout=None):
q = Queue() # fmin produces, the generator consumes
job_done = object() # signals the processing is done
# Producer
def my_callback(x):
q.put(x)
def task():
ret = scipy.optimize.fmin(func,x0,callback=my_callback)
q.put(job_done)
end_callback(ret) # "Returns" the result of the main call
# Starts fmin in a new thread
start_new_thread(task,())
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
if next_item is job_done:
break
yield next_item
更新:阻止执行下一次迭代,直到消费者完成处理最后一次迭代,还需要使用task_done
和join
。
# Producer
def my_callback(x):
q.put(x)
q.join() # Blocks until task_done is called
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
if next_item is job_done:
break
yield next_item
q.task_done() # Unblocks the producer, so a new iteration can start
请注意,maxsize=1
不是必需的,因为在消耗最后一个项目之前,不会将新项目添加到队列中。
更新2:另请注意,除非此生成器最终检索到所有项目,否则创建的线程将死锁(它将永久阻塞并且其资源永远不会被释放)。生产者正在等待队列,并且由于它存储了对该队列的引用,即使消费者是gc,它也永远不会被gc回收。然后队列将无法访问,因此没有人能够释放锁。
如果可能的话,一个干净的解决方案是未知的(因为它取决于fmin
所用的特定功能)。可以使用timeout
进行解决方法,如果put
阻塞太久,生产者会引发异常:
q = Queue(maxsize=1)
# Producer
def my_callback(x):
q.put(x)
q.put("dummy",True,timeout) # Blocks until the first result is retrieved
q.join() # Blocks again until task_done is called
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
q.task_done() # (one "task_done" per "get")
if next_item is job_done:
break
yield next_item
q.get() # Retrieves the "dummy" object (must be after yield)
q.task_done() # Unblocks the producer, so a new iteration can start
概念使用
maxsize=1
和生产者/消费者模型的阻塞队列。
回调产生,然后对回调的下一次调用将阻塞整个队列。
然后,使用者从队列中获取值,尝试获取另一个值,并在读取时阻塞。
生产者被允许推入队列,冲洗并重复。
用法:
def dummy(func, arg, callback=None):
for i in range(100):
callback(func(arg+i))
# Dummy example:
for i in Iteratorize(dummy, lambda x: x+1, 0):
print(i)
# example with scipy:
for i in Iteratorize(scipy.optimize.fmin, func, x0):
print(i)
可以按预期用于迭代器:
for i in take(5, Iteratorize(dummy, lambda x: x+1, 0)):
print(i)
迭代课:
from thread import start_new_thread
from Queue import Queue
class Iteratorize:
"""
Transforms a function that takes a callback
into a lazy iterator (generator).
"""
def __init__(self, func, ifunc, arg, callback=None):
self.mfunc=func
self.ifunc=ifunc
self.c_callback=callback
self.q = Queue(maxsize=1)
self.stored_arg=arg
self.sentinel = object()
def _callback(val):
self.q.put(val)
def gentask():
ret = self.mfunc(self.ifunc, self.stored_arg, callback=_callback)
self.q.put(self.sentinel)
if self.c_callback:
self.c_callback(ret)
start_new_thread(gentask, ())
def __iter__(self):
return self
def next(self):
obj = self.q.get(True,None)
if obj is self.sentinel:
raise StopIteration
else:
return obj
可能会做一些清理接受*args
和**kwargs
的功能被包装和/或最终结果回调。
让FakeFtp
使用retrbinary
函数,使用每次成功读取数据块时调用回调:
class FakeFtp(object):
def __init__(self):
self.data = iter(["aaa", "bbb", "ccc", "ddd"])
def login(self, user, password):
self.user = user
self.password = password
def retrbinary(self, cmd, cb):
for chunk in self.data:
cb(chunk)
使用简单的回调函数有一个缺点,即重复调用它,并且回调函数不能轻易地保持调用之间的上下文。
下面的代码定义了process_chunks
生成器,它将能够逐个接收数据块并进行处理。与简单的回调相比,这里我们能够将所有处理保持在一个函数中而不会丢失上下文。
from contextlib import closing
from itertools import count
def main():
processed = []
def process_chunks():
for i in count():
try:
# (repeatedly) get the chunk to process
chunk = yield
except GeneratorExit:
# finish_up
print("Finishing up.")
return
else:
# Here process the chunk as you like
print("inside coroutine, processing chunk:", i, chunk)
product = "processed({i}): {chunk}".format(i=i, chunk=chunk)
processed.append(product)
with closing(process_chunks()) as coroutine:
# Get the coroutine to the first yield
coroutine.next()
ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`
print("processed result", processed)
print("DONE")
要查看代码的实际效果,请输入FakeFtp
类,上面显示的代码和以下行:
main()
到一个文件并调用它:
$ python headsandtails.py
('inside coroutine, processing chunk:', 0, 'aaa')
('inside coroutine, processing chunk:', 1, 'bbb')
('inside coroutine, processing chunk:', 2, 'ccc')
('inside coroutine, processing chunk:', 3, 'ddd')
Finishing up.
('processed result', ['processed(0): aaa', 'processed(1): bbb', 'processed(2): ccc', 'processed(3): ddd'])
DONE
processed = []
就在这里表示,发电机process_chunks
与其外部环境合作没有任何问题。所有都包含在def main():
中证明,没有必要使用全局变量。
def process_chunks()
是解决方案的核心。它可能有一个输入参数(这里没有使用),但主要点,它接收输入是每个yield
线返回任何人通过.send(data)
发送到这个生成器的实例。一个人可以coroutine.send(chunk)
,但在这个例子中,它是通过回调参考这个函数callback.send
完成的。
请注意,在实际解决方案中,在代码中有多个yield
s是没有问题的,它们是逐个处理的。这可以用于例如读取(并忽略)CSV文件的标题,然后继续处理包含数据的记录。
我们可以实例化并使用生成器,如下所示:
coroutine = process_chunks()
# Get the coroutine to the first yield
coroutine.next()
ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`
# close the coroutine (will throw the `GeneratorExit` exception into the
# `process_chunks` coroutine).
coroutine.close()
真正的代码使用contextlib
closing
上下文管理器来确保,总是调用coroutine.close()
。
这个解决方案没有提供一种迭代器来消费传统风格的“来自外部”的数据。另一方面,我们能够:
致谢:该解决方案深受用户响应Python FTP “chunk” iterator (without loading entire file into memory) 的启发,由user2357112编写
使用threading
和queue
的解决方案很好但不是pythonic,这是使用subprocess
的更好的方法(至少对我来说〜):
import pickle
import scipy
import select
import subprocess
def my_fmin(func, x0):
# open a process to use as a pipeline
proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
def my_callback(x):
# x might be any object, not only str, so we use pickle to dump it
proc.stdin.write(pickle.dumps(x) + '\n')
scipy.optimize.fmin(func, x0, callback=my_callback)
# use select in case that the callback is asynchronous;
# otherwise, you can simply close proc.stdin and iterate over proc.stdout
while select.select([proc.stdout], [], [], 0)[0]:
yield pickle.loads(proc.stdout.readline()[:-1])
# close the process
proc.communicate()
然后你可以使用这样的函数:
for x in my_fmin(func, x0):
print x
怎么样
data = []
scipy.optimize.fmin(func,x0,callback=data.append)
for line in data:
print line
如果没有,您想要对发生器的数据做什么?