我正在创建一个 python 程序,它本质上是从 IBM i 数据队列中读取记录并对其进行处理以供外部使用。它使用 pyodbc 进行连接,对于读入的每条记录,我分离出一个子分支来处理信息,以便我可以同时处理多个(最多可节省资源的限制,目前在测试中为 5)儿童)。
我的代码大致如下:
PIDS = []
def main():
global PIDS
#Establish connection to GreenPRI
try:
cnx = pyodbc.connect('DSN=REDACTED; UID=%s; PWD=%s;' % (some.username, some.password))
except pyodbc.Error as err:
return # TODO: Error Logging
curs = cnx.cursor()
#Fetch each data entry from data queue - we need the ordinal position and the message data.
stmt = textwrap.dedent("""
SELECT message_data
FROM TABLE(QSYS2.RECEIVE_DATA_QUEUE(
DATA_QUEUE => 'MYDTAQ',
DATA_QUEUE_LIBRARY => 'MYLIB',
REMOVE => 'YES'
))
""")
#Loop until the data queue is empty
forked_pid = -1
while True:
# Check to see if any child processes have finished - we should reap them
for pid in PIDS:
ret_pid, exit_code = os.waitpid(pid, os.WNOHANG)
# Get next statement
curs.execute(stmt)
row = curs.fetchone()
if not row:
break
# Are we able to spin up a new child process?
if len(PIDS) >= 5:
ret_pid, exit_code = os.waitpid(-1, 0)
PIDS.remove(ret_pid)
forked_pid = os.fork()
# We are in the parent process - add child to list, loop
if forked_pid > 0:
PIDS.append(forked_pid)
continue
# We are in child process - process row, then leave.
elif forked_pid == 0:
processRow(row)
os.kill(os.getpid(), signal.SIGINT) #yes, this is messy...
return
# The result was negative, something went wrong in the child creation process - kill everything
else:
for pid in PIDS: # Wait for children to finish so we can reap them
ret_pid, exit_code = os.waitpid(pid, 0)
os.kill(os.getpid(), signal.SIGABRT)
# TODO: Error logging
# Cleanup
for pid in PIDS: # Wait for children to finish so we can reap them
ret_pid, exit_code = os.waitpid(pid, 0)
curs.close()
cnx.close()
问题是,当子进程结束时,ODBC 连接将关闭。父进程仍然认为它是打开的并尝试使用它,但我收到以下错误:
('24000', '[24000] [unixODBC][驱动程序管理器]无效的游标状态 (0) (SQLExecDirectW)') pyodbc.编程错误
我显然可以在旋转子进程之前打开连接,读取行并关闭连接,但这会导致我觉得不必要的开销,除非没有更好的方法。
我还可以立即读入每个数据队列记录,但是如果程序在处理所有记录的过程中崩溃,这会导致数据丢失。
堆栈上有一些涉及多线程程序和 ODBC 连接的好读物,但似乎没有专门处理我的问题(它们主要是关于未关闭的连接)。
有没有办法仅在父进程中保持连接处于活动状态,或者(作为更混乱的解决方案)强制连接保持打开状态,直到我显式调用 cnx.close()?
我想通了 - 我没有使用 os.fork,而是利用多处理 spawn 启动方法。这本质上创建了一个新的 python 进程,该进程仅使用被调用函数的范围。我的代码现在看起来像:
import pyodbc
import multiprocessing as mp
from multiprocessing.connection import wait
CHILDREN = []
def main():
global CHILDREN
# Establish connection
try:
cnx = pyodbc.connect('DSN=REDACTED; UID=%s; PWD=%s;' % (some.username, some.password))
except pyodbc.Error as err:
return # TODO: Error Logging
curs = cnx.cursor()
# Fetch each data entry from WBQSENDPG data queue - we need the ordinal position and the message data.
stmt = textwrap.dedent("""
SELECT message_data
FROM TABLE(QSYS2.RECEIVE_DATA_QUEUE(
DATA_QUEUE => 'MYDTAQ',
DATA_QUEUE_LIBRARY => 'MYLIB',
REMOVE => 'YES'
))
""")
# Loop until the data queue is empty
mp.set_start_method('spawn')
while True:
# Get rid of any children that are finished
CHILDREN = [child for child in CHILDREN if child.is_alive()]
# Get next statement
curs.execute(stmt)
row = curs.fetchone()
if not row:
break
# Are we able to spin up a new child process?
while len(CHILDREN) >= 5:
CHILDREN = [child for child in CHILDREN if child.is_alive()]
new_child = mp.Process(target=processRow, args=(row,))
new_child.start()
CHILDREN.append(new_child)
# Cleanup
while len(CHILDREN) > 0:
CHILDREN = [child for child in CHILDREN if child.is_alive()]
curs.close()
cnx.close()
def processRow(row):
# ... some code ...
if __name__ == '__main__':
while not [code implemented to handle stopping and interrupts, handled by supervisord]:
time.sleep(10)
main()
从这个答案找到了关于限制多线程进程范围的解决方案。
使用 wait() 调用肯定会更干净,但目前这是一个可行的解决方案。希望它对将来的人有所帮助。