我正在尝试计算目录中的所有标记。由于文档量很大,我想使用多重处理(或您可以随意提及的任何其他并行计算工具)来进行此计数。我的问题是幼稚的构造不起作用。
这是我构建的最小示例
import multiprocessing
import random
def count_tokens(document):
counter = dict()
for token in document:
if token in counter:
counter[token] += 1
else:
counter[token] = 1
return counter
tokens = ['tok'+str(i) for i in range(int(9))]
catalog = [random.choices(tokens, k=8) for _ in range(100)]
token_counts = {token: 0 for token in tokens}
def callback(result):
global token_counts
for token, count in result.items():
token_counts[token] += count
return token_counts
with multiprocessing.Pool() as pool:
for document in catalog:
pool.apply_async(count_tokens, args=(document,), callback=callback)
问题是返回的
token_counts
与非并行计算不一样
token_counts = {token: 0 for token in tokens}
for document in catalog:
callback(count_tokens(document))
可以肯定的是,我构建了完整的脚本
import multiprocessing
import random
def count_tokens(document):
counter = dict()
for token in document:
if token in counter:
counter[token] += 1
else:
counter[token] = 1
return counter
tokens = ['tok'+str(i) for i in range(int(9))]
catalog = [random.choices(tokens, k=8) for _ in range(100)]
token_counts = {token: 0 for token in tokens}
def callback(result):
global token_counts
for token, count in result.items():
token_counts[token] += count
return token_counts
with multiprocessing.Pool() as pool:
for document in catalog:
pool.apply_async(count_tokens, args=(document,), callback=callback)
count_multiprocessing = dict(**token_counts)
print(count_multiprocessing)
token_counts = {token: 0 for token in tokens}
for document in catalog:
callback(count_tokens(document))
count_onecpu = dict(**token_counts)
print(count_onecpu)
for token, count in count_onecpu.items():
assert count == count_multiprocessing[token]
for token, count in count_multiprocessing.items():
assert count == count_onecpu[token]
在家里总是会出现断言错误(Python 3.10.9,如果这很重要的话)。
您可能已经雇用了 默认字典(int)。 但更明显的事情是 “从集合导入 柜台”。 然后
def count_tokens(document):
counter = dict()
for token in document:
if token in counter:
counter[token] += 1
else:
counter[token] = 1
return counter
会变成简单的
return Counter(document)
。
您的
callback()
未获取锁。
所以{读,增量,写}结果并不稳健。
您希望池序列化短路径名 发送到工作进程,然后工作进程将 open() 文件并读取一个巨大的文档。 你不做什么 希望父进程保留巨大的文档, 然后必须费力地序列化并通过管道发送 到每个工作子进程。