带有回调的Python多处理apply_async不会更新字典数据结构

问题描述 投票:0回答:1

我正在尝试计算目录中的所有标记。由于文档量很大,我想使用多重处理(或您可以随意提及的任何其他并行计算工具)来进行此计数。我的问题是幼稚的构造不起作用。

这是我构建的最小示例

import multiprocessing
import random

def count_tokens(document):
    counter = dict()
    for token in document:
        if token in counter:
            counter[token] += 1
        else:
            counter[token] = 1
    return counter

tokens = ['tok'+str(i) for i in range(int(9))]
catalog = [random.choices(tokens, k=8) for _ in range(100)]

token_counts = {token: 0 for token in tokens}

def callback(result):
    global token_counts
    for token, count in result.items():
        token_counts[token] += count
    return token_counts

with multiprocessing.Pool() as pool:
    for document in catalog:
        pool.apply_async(count_tokens, args=(document,), callback=callback)

问题是返回的

token_counts
与非并行计算不一样

token_counts = {token: 0 for token in tokens}
for document in catalog:
    callback(count_tokens(document))

可以肯定的是,我构建了完整的脚本

import multiprocessing
import random

def count_tokens(document):
    counter = dict()
    for token in document:
        if token in counter:
            counter[token] += 1
        else:
            counter[token] = 1
    return counter

tokens = ['tok'+str(i) for i in range(int(9))]
catalog = [random.choices(tokens, k=8) for _ in range(100)]

token_counts = {token: 0 for token in tokens}

def callback(result):
    global token_counts
    for token, count in result.items():
        token_counts[token] += count
    return token_counts

with multiprocessing.Pool() as pool:
    for document in catalog:
        pool.apply_async(count_tokens, args=(document,), callback=callback)
        
count_multiprocessing = dict(**token_counts)
print(count_multiprocessing)

token_counts = {token: 0 for token in tokens}
for document in catalog:
    callback(count_tokens(document))

count_onecpu = dict(**token_counts)
print(count_onecpu)

for token, count in count_onecpu.items():
    assert count == count_multiprocessing[token]
for token, count in count_multiprocessing.items():
    assert count == count_onecpu[token]

在家里总是会出现断言错误(Python 3.10.9,如果这很重要的话)。

python python-multiprocessing
1个回答
0
投票

柜台

您可能已经雇用了 默认字典(int)。 但更明显的事情是 “从集合导入 柜台”。 然后

def count_tokens(document):
    counter = dict()
    for token in document:
        if token in counter:
            counter[token] += 1
        else:
            counter[token] = 1
    return counter

会变成简单的

return Counter(document)

互斥体

您的

callback()
未获取锁。 所以{读,增量,写}结果并不稳健。

开销

您希望池序列化短路径名 发送到工作进程,然后工作进程将 open() 文件并读取一个巨大的文档。 你做什么 希望父进程保留巨大的文档, 然后必须费力地序列化并通过管道发送 到每个工作子进程。

© www.soinside.com 2019 - 2024. All rights reserved.