如何从SFTPFile重构readChunk以停止使用inlineCallbacks?

问题描述 投票:0回答:1

我试图从ISFTPFile上的文件读取,我想避免在这种情况下使用@inlinceCallbacks

或者也许有更好的方式来读/写ISFTPFile

@defer.inlineCallbacks
def calculate_checksum(open_file):
    hasher = hashlib.sha256()

    offset = 0
    try:
        while True:
            d = yield open_file.readChunk(offset, chunk_size)
            offset += chunk_size
            hasher.update(d)

    except EOFError:
        pass

    target_checksum = hasher.hexdigest()
    defer.returnValue(target_checksum)


client_file = client.openFile(
    filename=target, flags=FXF_READ, attrs={})
checksum = yield client_file.addCallback(calculate_checksum)
python twisted sftp
1个回答
1
投票

您实际上想要在文件块的迭代器上映射sha256.update:

hasher = hashlib.sha256()
chunks = read_those_chunks()
map(hasher.update, chunks)
return hasher.hexdigest()

请注意,来自原始calculate_checksums的显式迭代(使用while循环)现在隐藏在map中。基本上,map已经取代了迭代。

障碍是你想要避免将整个文件加载到内存中的read_those_chunks(大概)。因此,作为第一步,实施该部分:

def read_those_chunks(open_file, chunk_size):
    offset = 0
    while True:
        yield open_file.readChunk(offset, chunk_size)
        offset += chunk_size

有一个生成器产生Deferreds,随后发布(或EOFError)。不幸的是,你不能在map上使用它。所以现在实现一个可以处理这个问题的地图:

def async_map(function, iterable):
    try:
        d = next(iterable)
    except StopIteration:
        return

    d.addCallback(function)
    d.addCallback(lambda ignored: async_map(function, iterable))
    return d

由于async_map将取代map并且map取代了原始实现中的迭代,async_map仍然负责确保我们访问迭代中的每个块。然而,迭代(使用forwhile)与Deferred不能很好地混合(混合它们的时候通常会拉出inlineCallbacks)。所以async_map不会迭代。它recurses - 迭代的常见替代方案。每个递归调用都在迭代的下一个元素上运行,直到不再存在(或直到Deferred失败,在这种情况下由于EOFError会发生)。

递归比使用Deferred的迭代效果更好,因为递归操作函数和函数调用。 Deferred可以处理函数和函数调用 - 将函数传递给addCallbackDeferred最终会调用该函数。迭代由一小部分函数(有时称为“块”或“套件”)组成,而Deferred无法处理这些。你无法将一个块传递给addCallback

现在使用这两个来创建一个在计算摘要时触发的Deferred

def calculate_checksum(open_file, chunk_size):
    hasher = hashlib.sha256()
    chunks = read_those_chunks(open_file, chunk_size)
    d = async_map(hasher.update, chunks)
    d.addErrback(lambda err: err.trap(EOFError))
    d.addCallback(lambda ignored: hasher.hexdigest())
    return d

您可能还注意到async_mapmap的不同之处在于它不会生成函数调用的结果列表。也许它更像reduce

def async_reduce(function, iterable, lhs):
    try:
        d = next(iterable)
    except StopIteration:
        return lhs

    d.addCallback(lambda rhs: function(lhs, rhs))
    d.addCallback(lambda lhs: async_reduce(function, iterable, lhs))
    return d

当然,它仍然是递归的而不是迭代的。

并且用于计算hexdigest的缩减函数如下:

def update_hash(hasher, s):
    hasher.update(s)
    return hasher

所以calculate_checksum成为:

def calculate_checksum(open_file, chunk_size):
    chunks = read_those_chunks(open_file, chunk_size)
    d = async_reduce(update_hash, hashlib.sha256(), "")
    d.addErrback(lambda err: err.trap(EOFError))
    d.addCallback(lambda hasher: hasher.hexdigest())
    return d

没有hasher关闭更好一点。

当然,还有许多其他方法可以重写此功能以避免inlineCallbacks。我选择的方式并没有消除使用生成器功能,所以如果你想要逃脱它并没有真正帮助。如果是这样,也许你可以像我在这里做的那样将问题分解成不同的部分,其中没有一部分涉及发电机。

© www.soinside.com 2019 - 2024. All rights reserved.