PyODBC 连接因子进程结束而关闭

问题描述 投票:0回答:1

我正在创建一个 python 程序,它本质上是从 IBM i 数据队列中读取记录并对其进行处理以供外部使用。它使用 pyodbc 进行连接,对于读入的每条记录,我分离出一个子分支来处理信息,以便我可以同时处理多个(最多可节省资源的限制,目前在测试中为 5)儿童)。

我的代码大致如下:

PIDS = []
def main():
    
    global PIDS
    
    #Establish connection to GreenPRI
    try:
        cnx = pyodbc.connect('DSN=REDACTED; UID=%s; PWD=%s;' % (some.username, some.password))
    except pyodbc.Error as err:
        return # TODO: Error Logging
    curs = cnx.cursor()
    
    #Fetch each data entry from data queue - we need the ordinal position and the message data.
    stmt = textwrap.dedent("""
        SELECT message_data
        FROM TABLE(QSYS2.RECEIVE_DATA_QUEUE(
            DATA_QUEUE => 'MYDTAQ',
            DATA_QUEUE_LIBRARY => 'MYLIB',
            REMOVE => 'YES'
        ))
        """)

    #Loop until the data queue is empty
    forked_pid = -1
    while True:
        
        # Check to see if any child processes have finished - we should reap them
        for pid in PIDS:
            ret_pid, exit_code = os.waitpid(pid, os.WNOHANG)
            
        # Get next statement
        curs.execute(stmt)
        row = curs.fetchone()
        if not row:
            break
        
        # Are we able to spin up a new child process?
        if len(PIDS) >= 5:
            ret_pid, exit_code = os.waitpid(-1, 0)
            PIDS.remove(ret_pid)
        
        forked_pid = os.fork()
        
        # We are in the parent process - add child to list, loop
        if forked_pid > 0:
            PIDS.append(forked_pid)
            continue
        
        # We are in child process - process row, then leave.
        elif forked_pid == 0:
            processRow(row)
            os.kill(os.getpid(), signal.SIGINT) #yes, this is messy...
            return
        
        # The result was negative, something went wrong in the child creation process - kill everything
        else:
            for pid in PIDS: # Wait for children to finish so we can reap them
                ret_pid, exit_code = os.waitpid(pid, 0)
            os.kill(os.getpid(), signal.SIGABRT)
            # TODO: Error logging
    
    # Cleanup
    for pid in PIDS: # Wait for children to finish so we can reap them
        ret_pid, exit_code = os.waitpid(pid, 0)
    curs.close()
    cnx.close()

问题是,当子进程结束时,ODBC 连接将关闭。父进程仍然认为它是打开的并尝试使用它,但我收到以下错误:

('24000', '[24000] [unixODBC][驱动程序管理器]无效的游标状态 (0) (SQLExecDirectW)') pyodbc.编程错误

我显然可以在旋转子进程之前打开连接,读取行并关闭连接,但这会导致我觉得不必要的开销,除非没有更好的方法。

我还可以立即读入每个数据队列记录,但是如果程序在处理所有记录的过程中崩溃,这会导致数据丢失。

堆栈上有一些涉及多线程程序和 ODBC 连接的好读物,但似乎没有专门处理我的问题(它们主要是关于未关闭的连接)。

有没有办法仅在父进程中保持连接处于活动状态,或者(作为更混乱的解决方案)强制连接保持打开状态,直到我显式调用 cnx.close()?

python python-multithreading pyodbc
1个回答
0
投票

我想通了 - 我没有使用 os.fork,而是利用多处理 spawn 启动方法。这本质上创建了一个新的 python 进程,该进程仅使用被调用函数的范围。我的代码现在看起来像:

import pyodbc
import multiprocessing as mp
from multiprocessing.connection import wait

CHILDREN = []

def main():
    
    global CHILDREN
    
    # Establish connection
    try:
        cnx = pyodbc.connect('DSN=REDACTED; UID=%s; PWD=%s;' % (some.username, some.password))
    except pyodbc.Error as err:
        return # TODO: Error Logging
    curs = cnx.cursor()
    
    # Fetch each data entry from WBQSENDPG data queue - we need the ordinal position and the message data.
    stmt = textwrap.dedent("""
        SELECT message_data
        FROM TABLE(QSYS2.RECEIVE_DATA_QUEUE(
            DATA_QUEUE => 'MYDTAQ',
            DATA_QUEUE_LIBRARY => 'MYLIB',
            REMOVE => 'YES'
        ))
        """)
    
    # Loop until the data queue is empty
    mp.set_start_method('spawn')
    while True:
        
        # Get rid of any children that are finished
        CHILDREN = [child for child in CHILDREN if child.is_alive()]
            
        # Get next statement
        curs.execute(stmt)
        row = curs.fetchone()
        if not row:
            break
        
        # Are we able to spin up a new child process?
        while len(CHILDREN) >= 5:
            CHILDREN = [child for child in CHILDREN if child.is_alive()]
        
        new_child = mp.Process(target=processRow, args=(row,))
        new_child.start()
        CHILDREN.append(new_child)
    
    # Cleanup
    while len(CHILDREN) > 0:
        CHILDREN = [child for child in CHILDREN if child.is_alive()]
        
    curs.close()
    cnx.close()

def processRow(row):
    # ... some code ...

if __name__ == '__main__':
    while not [code implemented to handle stopping and interrupts, handled by supervisord]:
        time.sleep(10)
        main()

这个答案找到了关于限制多线程进程范围的解决方案。

使用 wait() 调用肯定会更干净,但目前这是一个可行的解决方案。希望它对将来的人有所帮助。

© www.soinside.com 2019 - 2024. All rights reserved.