我有一个包含表名的列表,比如说列表的大小是n。现在我有m个服务器,所以我打开了m个游标,每个游标也在另一个列表中。现在我想为每个表调用一个函数,它的参数是这两个列表。
templst = [T1,T2,T3,T4,T5,T6, T7,T8,T9,T10,T11]
curlst = [cur1,cur2,cur3,cur4,cur5]
这些游标以cur = conn.cursor()的形式打开,所以这些是对象。
def extract_single(tableName, cursorconn):
qry2 = "Select * FROM %s"% (tableName)
cursorconn.execute(qry2).fetchall()
print " extraction done"
return
现在我已经打开了5个进程(因为我有5个游标),以便让它们并行运行。
processes = []
x = 0
for x in range(5):
new_p = 'p%x'%x
print "process :", new_p
new_p = multiprocessing.Process(target=extract_single, args=(templst[x],cur[x]))
new_p.start()
processes.append(new_p)
for process in processes:
process.join()
现在我想,一旦这5个进程中的任何一个进程完成,它就应该立即从我的templst中取出第6个表,同样的事情一直持续到所有的templst完成。
如何修改这段代码来实现这种行为?举个简单的例子,我想做什么。让我们把一个templst看成一个int,我想为它调用sleep函数。
templst = [1,2,5,7,4,3,6,8,9,10,11]
curlst = [cur1,cur2,cur3,cur4,cur5]
def extract_single(sec, cursorconn):
print "Sleeping for second=%s done by cursor=%s"% (sec,cursorconn)
time.sleep(sec)
print " sleeping done"
return
所以,当我启动5个游标时,有可能sleep(1)或sleep(2)先完成,所以当它完成后,我想对该游标运行sleep(3)。
我的真正的查询将依赖于游标,因为它将是SQL查询。
修改后的办法考虑到前面的睡眠例子,我现在想实现的是,假设有10个游标,我的睡眠队列是按递增或递减的顺序排列的。现在我想实现的是,假设有10个游标,我的睡眠队列是按递增或递减的顺序排列的。考虑到列表中的递增顺序,现在10个游标中,前5个游标将取队列中的前5个元素,而我的另一组5个游标将取最后5个元素,所以基本上我的游标队列被分成了两半,一半取最低值,另一半取最高值。 所以基本上我的游标队列分为两半,一半取最低值,另一半取最高值,如果第一半的游标完成了,它应该取下一个最低值,如果第二半的游标完成了,它应该取(n-6)个值,即从终点开始取6个值。
我需要从两边穿越队列,并且有两组游标,每组5个,每组6个。
example: curlst1 = [cur1,cur2,cur3,cur4,cur5]
curlst2 = [cur6,cur7,cur8,cur9,cur10 ]
templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
so cur1 -> 1
cur2 ->2
... cur5 -> 5
cur6 -> 16
cur7 ->15
.... cur10->12
现在cur1先完成,所以需要6个(第一个从前面开始的可用元素)cur2完成,需要7个,以此类推,如果cur10完成,将需要11个(下一个从后面开始的可用元素
以此类推,直到templst的所有元素。
把你的 templst
参数,无论是真实例子中的表名还是下面例子中的休眠秒数,都是在一个多进程队列上。然后每个进程循环读取队列中的下一个项目。当队列空了,就没有更多的工作要做,你的进程就可以返回了。实际上,你已经实现了自己的进程池,每个进程都有自己的专用游标连接。现在我们的函数 extract_single
以队列作为第一个参数,从队列中检索表名或秒参数。
import multiprocessing
import Queue
import time
def extract_single(q, cursorconn):
while True:
try:
sec = q.get_nowait()
print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
time.sleep(sec)
print " sleeping done"
except Queue.Empty:
return
def main():
q = multiprocessing.Queue()
templst = [1,2,5,7,4,3,6,8,9,10,11]
for item in templst:
q.put(item) # add items to queue
curlst = [cur1,cur2,cur3,cur4,cur5]
process = []
for i in xrange(5):
p = multiprocessing.Process(target=extract_single, args=(q, curlst[i]))
process.append(p)
p.start()
for p in process:
p.join()
if __name__ == '__main__':
main()
注释
如果你的处理器少于5个,你可以尝试用5个(或更多)线程来运行,在这种情况下,一个普通的 Queue
应使用对象。
更新问题的答案
允许你从队列前部和队列后部删除项目的数据结构被称为一个 辞令 (双端队列)。不幸的是,目前还没有支持多处理的deque版本。但我认为,你的表处理可能和线程一样好用,反正你的计算机不太可能有10个处理器来支持10个并发进程运行。
import threading
from collections import deque
import time
import sys
templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
q = deque(templst)
curlst1 = [cur1,cur2,cur3,cur4,cur5]
curlst2 = [cur6,cur7,cur8,cur9,cur10]
def extract_single(cursorconn, from_front):
while True:
try:
sec = q.popleft() if from_front else q.pop()
#print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
sys.stdout.write("Sleeping for second=%s done by cursor=%s\n" % (sec,cursorconn))
sys.stdout.flush() # flush output
time.sleep(sec)
#print " sleeping done"
sys.stdout.write("sleeping done by %s\n" % cursorconn)
sys.stdout.flush() # flush output
except IndexError:
return
def main():
threads = []
for cur in curlst1:
t = threading.Thread(target=extract_single, args=(cur, True))
threads.append(t)
t.start()
for cur in curlst2:
t = threading.Thread(target=extract_single, args=(cur, False))
threads.append(t)
t.start()
for t in threads:
t.join()
if __name__ == '__main__':
main()