我试图从ISFTPFile
上的文件读取,我想避免在这种情况下使用@inlinceCallbacks
?
或者也许有更好的方式来读/写ISFTPFile
?
@defer.inlineCallbacks
def calculate_checksum(open_file):
hasher = hashlib.sha256()
offset = 0
try:
while True:
d = yield open_file.readChunk(offset, chunk_size)
offset += chunk_size
hasher.update(d)
except EOFError:
pass
target_checksum = hasher.hexdigest()
defer.returnValue(target_checksum)
client_file = client.openFile(
filename=target, flags=FXF_READ, attrs={})
checksum = yield client_file.addCallback(calculate_checksum)
您实际上想要在文件块的迭代器上映射sha256.update:
hasher = hashlib.sha256()
chunks = read_those_chunks()
map(hasher.update, chunks)
return hasher.hexdigest()
请注意,来自原始calculate_checksums
的显式迭代(使用while循环)现在隐藏在map
中。基本上,map
已经取代了迭代。
障碍是你想要避免将整个文件加载到内存中的read_those_chunks
(大概)。因此,作为第一步,实施该部分:
def read_those_chunks(open_file, chunk_size):
offset = 0
while True:
yield open_file.readChunk(offset, chunk_size)
offset += chunk_size
有一个生成器产生Deferred
s,随后发布(或EOFError
)。不幸的是,你不能在map
上使用它。所以现在实现一个可以处理这个问题的地图:
def async_map(function, iterable):
try:
d = next(iterable)
except StopIteration:
return
d.addCallback(function)
d.addCallback(lambda ignored: async_map(function, iterable))
return d
由于async_map
将取代map
并且map
取代了原始实现中的迭代,async_map
仍然负责确保我们访问迭代中的每个块。然而,迭代(使用for
或while
)与Deferred
不能很好地混合(混合它们的时候通常会拉出inlineCallbacks
)。所以async_map
不会迭代。它recurses - 迭代的常见替代方案。每个递归调用都在迭代的下一个元素上运行,直到不再存在(或直到Deferred
失败,在这种情况下由于EOFError
会发生)。
递归比使用Deferred
的迭代效果更好,因为递归操作函数和函数调用。 Deferred
可以处理函数和函数调用 - 将函数传递给addCallback
,Deferred
最终会调用该函数。迭代由一小部分函数(有时称为“块”或“套件”)组成,而Deferred
无法处理这些。你无法将一个块传递给addCallback
。
现在使用这两个来创建一个在计算摘要时触发的Deferred
:
def calculate_checksum(open_file, chunk_size):
hasher = hashlib.sha256()
chunks = read_those_chunks(open_file, chunk_size)
d = async_map(hasher.update, chunks)
d.addErrback(lambda err: err.trap(EOFError))
d.addCallback(lambda ignored: hasher.hexdigest())
return d
您可能还注意到async_map
与map
的不同之处在于它不会生成函数调用的结果列表。也许它更像reduce
:
def async_reduce(function, iterable, lhs):
try:
d = next(iterable)
except StopIteration:
return lhs
d.addCallback(lambda rhs: function(lhs, rhs))
d.addCallback(lambda lhs: async_reduce(function, iterable, lhs))
return d
当然,它仍然是递归的而不是迭代的。
并且用于计算hexdigest的缩减函数如下:
def update_hash(hasher, s):
hasher.update(s)
return hasher
所以calculate_checksum
成为:
def calculate_checksum(open_file, chunk_size):
chunks = read_those_chunks(open_file, chunk_size)
d = async_reduce(update_hash, hashlib.sha256(), "")
d.addErrback(lambda err: err.trap(EOFError))
d.addCallback(lambda hasher: hasher.hexdigest())
return d
没有hasher
关闭更好一点。
当然,还有许多其他方法可以重写此功能以避免inlineCallbacks
。我选择的方式并没有消除使用生成器功能,所以如果你想要逃脱它并没有真正帮助。如果是这样,也许你可以像我在这里做的那样将问题分解成不同的部分,其中没有一部分涉及发电机。