Python多处理:如何从一组进程中用列表的下一个元素再次运行一个进程?

问题描述 投票:0回答:1

我有一个包含表名的列表,比如说列表的大小是n。现在我有m个服务器,所以我打开了m个游标,每个游标也在另一个列表中。现在我想为每个表调用一个函数,它的参数是这两个列表。

templst = [T1,T2,T3,T4,T5,T6, T7,T8,T9,T10,T11]
curlst = [cur1,cur2,cur3,cur4,cur5]

这些游标以cur = conn.cursor()的形式打开,所以这些是对象。

def extract_single(tableName, cursorconn):
      qry2 = "Select * FROM %s"% (tableName)
      cursorconn.execute(qry2).fetchall()
      print " extraction done"
      return 

现在我已经打开了5个进程(因为我有5个游标),以便让它们并行运行。

processes = []

x = 0
for x in range(5):
   new_p = 'p%x'%x
   print "process :", new_p
   new_p = multiprocessing.Process(target=extract_single, args=(templst[x],cur[x]))
   new_p.start()
   processes.append(new_p)


for process in processes:
    process.join()

现在我想,一旦这5个进程中的任何一个进程完成,它就应该立即从我的templst中取出第6个表,同样的事情一直持续到所有的templst完成。

如何修改这段代码来实现这种行为?举个简单的例子,我想做什么。让我们把一个templst看成一个int,我想为它调用sleep函数。

templst = [1,2,5,7,4,3,6,8,9,10,11]
curlst = [cur1,cur2,cur3,cur4,cur5]

def extract_single(sec, cursorconn):
      print "Sleeping for second=%s done by cursor=%s"% (sec,cursorconn)
      time.sleep(sec)
      print " sleeping done"
      return

所以,当我启动5个游标时,有可能sleep(1)或sleep(2)先完成,所以当它完成后,我想对该游标运行sleep(3)。

我的真正的查询将依赖于游标,因为它将是SQL查询。

修改后的办法考虑到前面的睡眠例子,我现在想实现的是,假设有10个游标,我的睡眠队列是按递增或递减的顺序排列的。现在我想实现的是,假设有10个游标,我的睡眠队列是按递增或递减的顺序排列的。考虑到列表中的递增顺序,现在10个游标中,前5个游标将取队列中的前5个元素,而我的另一组5个游标将取最后5个元素,所以基本上我的游标队列被分成了两半,一半取最低值,另一半取最高值。 所以基本上我的游标队列分为两半,一半取最低值,另一半取最高值,如果第一半的游标完成了,它应该取下一个最低值,如果第二半的游标完成了,它应该取(n-6)个值,即从终点开始取6个值。

我需要从两边穿越队列,并且有两组游标,每组5个,每组6个。

example: curlst1 = [cur1,cur2,cur3,cur4,cur5]
         curlst2 = [cur6,cur7,cur8,cur9,cur10 ]
        templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]

so cur1 -> 1
   cur2 ->2
... cur5 -> 5
cur6 -> 16
cur7 ->15
.... cur10->12

现在cur1先完成,所以需要6个(第一个从前面开始的可用元素)cur2完成,需要7个,以此类推,如果cur10完成,将需要11个(下一个从后面开始的可用元素

以此类推,直到templst的所有元素。

python multiprocessing python-multiprocessing pyodbc
1个回答
0
投票

把你的 templst 参数,无论是真实例子中的表名还是下面例子中的休眠秒数,都是在一个多进程队列上。然后每个进程循环读取队列中的下一个项目。当队列空了,就没有更多的工作要做,你的进程就可以返回了。实际上,你已经实现了自己的进程池,每个进程都有自己的专用游标连接。现在我们的函数 extract_single 以队列作为第一个参数,从队列中检索表名或秒参数。

import multiprocessing
import Queue
import time

def extract_single(q, cursorconn):
    while True:
        try:
            sec = q.get_nowait()
            print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
            time.sleep(sec)
            print " sleeping done"
        except Queue.Empty:
            return

def main():
    q = multiprocessing.Queue()
    templst = [1,2,5,7,4,3,6,8,9,10,11]
    for item in templst:
        q.put(item) # add items to queue
    curlst = [cur1,cur2,cur3,cur4,cur5]
    process = []
    for i in xrange(5):
        p = multiprocessing.Process(target=extract_single, args=(q, curlst[i]))
        process.append(p)
        p.start()
    for p in process:
        p.join()

if __name__ == '__main__':
    main()

注释

如果你的处理器少于5个,你可以尝试用5个(或更多)线程来运行,在这种情况下,一个普通的 Queue 应使用对象。

更新问题的答案

允许你从队列前部和队列后部删除项目的数据结构被称为一个 辞令 (双端队列)。不幸的是,目前还没有支持多处理的deque版本。但我认为,你的表处理可能和线程一样好用,反正你的计算机不太可能有10个处理器来支持10个并发进程运行。

import threading
from collections import deque
import time
import sys

templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
q = deque(templst)
curlst1 = [cur1,cur2,cur3,cur4,cur5]
curlst2 = [cur6,cur7,cur8,cur9,cur10]

def extract_single(cursorconn, from_front):
    while True:
        try:
            sec = q.popleft() if from_front else q.pop()
            #print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
            sys.stdout.write("Sleeping for second=%s done by cursor=%s\n" % (sec,cursorconn))
            sys.stdout.flush() # flush output
            time.sleep(sec)
            #print " sleeping done"
            sys.stdout.write("sleeping done by %s\n" % cursorconn)
            sys.stdout.flush() # flush output
        except IndexError:
            return

def main():
    threads = []
    for cur in curlst1:
        t = threading.Thread(target=extract_single, args=(cur, True))
        threads.append(t)
        t.start()
    for cur in curlst2:
        t = threading.Thread(target=extract_single, args=(cur, False))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

if __name__ == '__main__':
    main()
© www.soinside.com 2019 - 2024. All rights reserved.