Python:多处理中可能的数据丢失.Queue()

问题描述 投票:2回答:1

假设我有以下示例,其中我创建了一个守护程序并尝试通过事件标志与之通信:

from multiprocessing import Process, Event, Queue
import time

def reader(data):
    input_queue = data[0]
    e = data[1]
    output_queue = data[2]

    while True:
        if not e.is_set(): # if there is a signal to start
            msg = input_queue.get()         # Read from the queue 
            output_queue.put(msg)     # copy to output_queue
            if (msg == 'DONE'):  # signal to stop              
                e.set() # signal that worker is done


def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    input_queue = Queue()   # reader() reads from queue
                          # writer() writes to queue

    output_queue = Queue()


    e = Event()
    e.set()

    reader_p = Process(target=reader, args=((input_queue, e, output_queue),))
    reader_p.daemon = True
    reader_p.start()        # Launch reader() as a separate python process

    for count in [10**4, 10**5, 10**6]:



        _start = time.time()
        writer(count, input_queue)    # Send a lot of stuff to reader()

        e.clear() # unset event, giving signal to a worker


        e.wait() # waiting for reader to finish


        # fetch results from output_queue:
        results = []
        while not output_queue.empty():
            results += [output_queue.get()]

        print(len(results)) # check how many results we have

        print("Sending %s numbers to Queue() took %s seconds" % (count, 
            (time.time() - _start)))

我使用输入和输出队列,在这个例子中,worker只是将数据复制到输出,稍后我将在程序中获取。一切似乎都没问题,直到数据的长度为10k(实际上是队列大小限制,以字节为单位?),但是当我尝试复制更多元素时,我收到了随机数的结果,但比发送的要少得多:

10001
Sending 10000 numbers to Queue() took 0.4259309768676758 seconds
18857
Sending 100000 numbers to Queue() took 1.693629503250122 seconds
12439
Sending 1000000 numbers to Queue() took 10.592029809951782 seconds

10001
Sending 10000 numbers to Queue() took 0.41446948051452637 seconds
46615
Sending 100000 numbers to Queue() took 1.9259979724884033 seconds
18623
Sending 1000000 numbers to Queue() took 10.06524133682251 seconds

更新:现在我尝试在三个工人之间共享数据。我已经检查过它们都正常工作,但数据丢失并没有停止:

import multiprocessing
from multiprocessing import Process, Event, Queue
import time

def reader(data):
    input_queue = data[0]
    e = data[1]
    output_queue = data[2]


    while True:
        if not e.is_set(): # if there is a signal to start

                #if not output_queue.empty(): # hangs somewhy
                msg = input_queue.get()         # Read from the queue 
                output_queue.put(msg)     # copy to output_queue
                #print("1")
                if (msg == 'DONE'):  # signal to stop              
                    e.set() # signal that there is no more data
                    print("done")



def reader1(data):
    input_queue = data[0]
    e = data[1]
    output_queue = data[2]


    while True:
        if not e.is_set(): # if there is a signal to start
                msg = input_queue.get()         # Read from the queue 
                output_queue.put(msg)     # copy to output_queue
                #print("2")
                if (msg == 'DONE'):  # signal to stop              
                    e.set() # signal that there is no more data
                    print("done")


def reader2(data):
    input_queue = data[0]
    e = data[1]
    output_queue = data[2]

    while True:
        if not e.is_set(): # if there is a signal to start
                msg = input_queue.get()         # Read from the queue 
                output_queue.put(msg)     # copy to output_queue
                #print("3")
                if (msg == 'DONE'):  # signal to stop              
                    e.set() # signal that there is no more data
                    print("done")






def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':

    # I do not use manager, as it makes everything extremely slow
    #m = multiprocessing.Manager()
    #input_queue = m.Queue()

    input_queue = Queue()   # reader() reads from queue
                          # writer() writes to queue

    output_queue = Queue()


    e = Event()
    e.set()

    reader_p = Process(target=reader, args=((input_queue, e, output_queue),))
    reader_p.daemon = True
    reader_p.start()        # Launch reader() as a separate python process

    reader_p1 = Process(target=reader1, args=((input_queue, e, output_queue),))
    reader_p1.daemon = True
    reader_p1.start() 

    reader_p2 = Process(target=reader2, args=((input_queue, e, output_queue),))
    reader_p2.daemon = True
    reader_p2.start() 

    for count in [10**4, 10**5, 10**6]:


        _start = time.time()
        writer(count, input_queue)    # Send a lot of stuff to readers

        e.clear() # unset event, giving signal to a worker


        e.wait() # waiting for reader to finish


        # fetch results from output_queue:
        results = []
        while not output_queue.empty():
            results += [output_queue.get()]

        print(len(results)) # check how many results we have

        print("Sending %s numbers to Queue() took %s seconds" % (count, 
            (time.time() - _start)))

结果,有时我的第二阶段正确完成:

done
10001
Sending 10000 numbers to Queue() took 0.37468671798706055 seconds
done
18354
Sending 100000 numbers to Queue() took 1.2723915576934814 seconds
done
34807
Sending 1000000 numbers to Queue() took 9.1871018409729 seconds

done
10001
Sending 10000 numbers to Queue() took 0.37137532234191895 seconds
done
100001
Sending 100000 numbers to Queue() took 2.5747978687286377 seconds
done
217034
Sending 1000000 numbers to Queue() took 12.640174627304077 seconds
python python-3.x python-multiprocessing
1个回答
1
投票

队列大小确实存在限制:在多处理中,此限制不可靠,一旦达到,queue.put将被阻塞,直到队列清空为止。有关更多信息,请参阅文档:https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Queue

在你的情况下,这不是问题。您刚刚定义了一个不良条件来停止获取结果:

while not output_queue.empty():
     results += [output_queue.get()]

在您的情况下,如果编写者比读者慢(而且他们会不时),即使作者还没有完成所有内容的发送,您的队列可能会暂时空着。这就是为什么你的阅读数量不稳定的原因。

为了确认,我用这个替换了这个条件:

t0 = time.time()
while time.time()-t0<30: # seems to be enough to complete your loops, but it's just a demo condition, you should not use this
    try:
        results += [output_queue.get(timeout=1)]
    except Exception as expt: # the output_queue.get(timeout=1) will wait up to 1 second if the queue is momentarily empty. If the queue is empty for more than 1 sec, it raises an exception and it means the loop is complete. Again, this is not a good condition in real life, and this is just for testing.
        break
© www.soinside.com 2019 - 2024. All rights reserved.