尝试使两个子进程共享处理相同资源的负载时出现问题

问题描述 投票:0回答:1

我正在搞乱python多处理模块。但事情并没有像我期望的那样有效,所以现在我有点困惑。

在python脚本中,我创建了两个子进程,因此它们可以使用相同的资源。我当时认为他们会或多或少地“共享”负载,但似乎不是这样做,其中一个进程只执行一次,而另一个进程几乎处理所有事情。

为了测试它,我编写了以下代码:

#!/usr/bin/python

import os
import multiprocessing

# Worker function
def worker(queueA, queueB):
    while(queueA.qsize() != 0):
        item = queueA.get()
        item = "item: " + item + ". processed by worker " + str(os.getpid())
        queueB.put(item)
    return

# IPC Manager
manager = multiprocessing.Manager()
queueA = multiprocessing.Queue()
queueB = multiprocessing.Queue()

# Fill queueA with data
for i in range(0, 10):
    queueA.put("hello" + str(i+1))

# Create processes
process1 = multiprocessing.Process(target = worker, args = (queueA, queueB,))
process2 = multiprocessing.Process(target = worker, args = (queueA, queueB,))

# Call processes
process1.start()
process2.start()

# Wait for processes to stop processing
process1.join()
process2.join()

for i in range(0, queueB.qsize()):
    print queueB.get()

这打印出以下内容:

item: hello1. processed by worker 11483
item: hello3. processed by worker 11483
item: hello4. processed by worker 11483
item: hello5. processed by worker 11483
item: hello6. processed by worker 11483
item: hello7. processed by worker 11483
item: hello8. processed by worker 11483
item: hello9. processed by worker 11483
item: hello10. processed by worker 11483
item: hello2. processed by worker 11482

正如您所看到的,其中一个进程只与其中一个元素一起工作,并且它不会继续获得队列的更多元素,而另一个必须与其他所有元素一起工作。

我认为这不正确,或者至少不是我的预期。你能告诉我实现这个想法的正确方法是什么?

python process multiprocessing python-multiprocessing
1个回答
1
投票

你是对的,他们不会完全平等,但主要是因为你的测试样本太小了。每个进程都需要一段时间才能开始并开始处理。处理队列中的项目所花费的时间非常短,因此可以在另一个项目通过之前快速处理9个项目。

我在下面测试了这个(在Python3中,但它应该适用于2.7,只需将print()函数更改为print语句):

import os
import multiprocessing

# Worker function
def worker(queueA, queueB):
    for item in iter(queueA.get, 'STOP'):
        out = str(os.getpid())
        queueB.put(out)
    return

# IPC Manager
manager = multiprocessing.Manager()
queueA = multiprocessing.Queue()
queueB = multiprocessing.Queue()

# Fill queueA with data
for i in range(0, 1000):
    queueA.put("hello" + str(i+1))

# Create processes
process1 = multiprocessing.Process(target = worker, args = (queueA, queueB,))
process2 = multiprocessing.Process(target = worker, args = (queueA, queueB,))

# Call processes
process1.start()
process2.start()

queueA.put('STOP')
queueA.put('STOP')

# Wait for processes to stop processing
process1.join()
process2.join()

all = {}
for i in range(1000):
    item = queueB.get()
    if item not in all:
        all[item] = 1
    else:
        all[item] += 1
print(all)

我的输出(每个进程完成了多少次):

{'18376': 537, 
 '18377': 463}

虽然它们不完全相同,但随着我们接近更长的时间,它们将接近平等。

编辑: 确认这一点的另一种方法是在worker函数中添加time.sleep(3)

def worker(queueA, queueB):
    for item in iter(queueA.get, 'STOP'):
        time.sleep(3)
        out = str(os.getpid())
        queueB.put(out)
    return

我在原始示例中运行了range(10)测试并得到:

{'18428': 5,
 '18429': 5}
© www.soinside.com 2019 - 2024. All rights reserved.