Python cx_oracle多线程不适用于每个线程的游标

问题描述 投票:1回答:2

我试图在python中并行使用cx_Oracle运行完全独立的Oracle查询。

我可以通过为每个线程设置一个新的数据库连接,然后在每个单独的线程中运行查询来成功完成这项工作,这使得总时间从大约2分钟到1分钟20,所以它肯定有效。查询时间:

START_TIME                      END_TIME
17-FEB-16 22.33.28.000000000    17-FEB-16 22.33.30.000000000
17-FEB-16 22.33.30.000000000    17-FEB-16 22.33.33.000000000
17-FEB-16 22.33.33.000000000    17-FEB-16 22.33.36.000000000
17-FEB-16 22.33.36.000000000    17-FEB-16 22.33.36.000000000
17-FEB-16 22.33.36.000000000    17-FEB-16 22.34.08.000000000
17-FEB-16 22.34.08.000000000    17-FEB-16 22.34.26.000000000
17-FEB-16 22.34.26.000000000    17-FEB-16 22.34.27.000000000
17-FEB-16 22.34.27.000000000    17-FEB-16 22.34.29.000000000

然而,在每个线程中建立与数据库的连接存在开销,我很确定我应该能够为每个线程创建一个新游标并共享连接,如下所示:

http://www.oracle.com/technetwork/articles/vasiliev-python-concurrency-087536.html

当我共享连接并使用单独的游标时会发生什么,但是查询都是在同一时间开始然后同时结束所以它看起来就像生成线程时一样,在数据库上查询仍在按顺序运行。查询时间:

START_TIME                      END_TIME
17-FEB-16 22.36.32.000000000    17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.32.000000000    17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.32.000000000    17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.31.000000000    17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.31.000000000    17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.31.000000000    17-FEB-16 22.38.21.000000000
17-FEB-16 22.36.31.000000000    17-FEB-16 22.38.21.000000000

多连接代码:

for file in file_transporter.complete_file_list:
        #Get database and open connection
        the_db =      shared_lib_wrapper.get_oracle().Oracle(the_logger)
        the_db .connect(conn_str())
        #Create new thread
        thread = threading.Thread(target=Loader, args=(params, the_date, the_logger, the_db, file, file_transporter.complete_file_list[file]))
        the_logger.info("Running Thread: " + thread.getName())
        thread.start()

多游标的代码(在runLoad中有一个创建新游标并执行过程的函数 - 见下文):

for file in self.file_list:
        file_parametes = self.file_list[file]
        function_to_run = file_parametes['LOAD_PACKAGE'] + '.' + file_parametes['LOAD_FUNCTION']

        #Create new thread
        thread = threading.Thread(target=self.runLoad, args=(file_parametes['RUN_ID'], function_to_run))
        self.log.info("Spawned Thread: " + thread.getName())
        self.log.info("Running Thread: " + thread.getName())
        thread.start()

创建游标的代码:

def execute_stored_proc_with_in_and_out_params(self, proc_name, params, dbms_logging=False):
    try:
        cursor = cx_Oracle.Cursor(self.db_conn

因此,我的问题是:

1)创建游标我做错了什么? - 如果有任何关于如何修复它的想法,我已经读过cx_oracle是threadsafety 2:

Currently 2, which means that threads may share the module and connections, but not cursors.

2)如果我不能共享连接是否为每个线程创建一个新的连接有什么问题,它仍然可以提高性能,即使创建每个连接的开销?

python multithreading oracle cx-oracle
2个回答
3
投票

请参阅下面的内容,它是使用相同连接但每个线程中有一个单独游标的程序的工作实现。我调用的过程是在cx_Oracle测试用例(5.2.1版本的一部分)中,并且非常简单,因此我在示例中多次调用它(每个线程中的一个随机数)。输出清楚地表明线程没有同时完成。

from __future__ import print_function

import cx_Oracle
import datetime
import random
import threading

connection = cx_Oracle.Connection("cx_Oracle/dev", threaded = True)

def TestThread(threadNum):
     startTime = datetime.datetime.today()
     cursor = connection.cursor()
     numInputs = int(random.random() * 5000)
     print("Thread", threadNum, "with", numInputs, "inputs:", startTime)
     for i in range(numInputs):
         value = bool(int(random.random() * 2))
         cursor.callfunc("pkg_TestBooleans.GetStringRep", str, (value,))
     endTime = datetime.datetime.today()
     print("Thread", threadNum, "with", numInputs, "inputs:", endTime)

 threads = []
 for i in range(8):
     thread = threading.Thread(target = TestThread, args = (i + 1,))
     threads.append(thread)
     thread.start()
 print("All threads spawned...waiting for them to complete...")
 for thread in threads:
     thread.join()

输出如下:

线程1与3405输入:2016-02-22 07:55:07.849127 线程2与2706输入:2016-02-22 07:55:07.849998 线程3与4101输入:2016-02-22 07:55:07.850256 线程4与2912输入:2016-02-22 07:55:07.850937 线程5与3747输入:2016-02-22 07:55:07.851275 线程6与4318输入:2016-02-22 07:55:07.851534 线程7与1453输入:2016-02-22 07:55:07.852649 线程8与3304输入:2016-02-22 07:55:07.853090 产生的所有线程......等待它们完成...... 线程7与1453输入:2016-02-22 07:55:09.897217 线程2与2706输入:2016-02-22 07:55:11.446744 线程4与2912输入:2016-02-22 07:55:11.681414 线程8与3304输入:2016-02-22 07:55:12.016809 线程1与3405输入:2016-02-22 07:55:12.081846 线程5与3747输入:2016-02-22 07:55:12.266111 线程3与4101输入:2016-02-22 07:55:12.375623 线程6与4318输入:2016-02-22 07:55:12.409352


1
投票
from concurrent.futures import ThreadPoolExecutor, as_completed
import cx_Oracle
import datetime

CONN_INFO = {
    'host': 'xxx.xx.xx.x',
    'port': 99999,
    'user': 'user_name',
    'psw': 'password',
    'service': 'abc.xyz.com',
}

CONN_STR = '{user}/{psw}@{host}:{port}/{service}'.format(**CONN_INFO)

# your long running query
QUERY = 'SELECT FROM * customer where date = :date'

def run(date):
    conn = cx_Oracle.connect(CONN_STR, threaded=True)
    cursor = conn.cursor()
    cursor.execute(QUERY, {'date': date})    
    data = cursor.fetchall()
    cursor.close()

    return data

def main():
    dates = [datetime.datetime.today() - datetime.timedelta(days=x) for x in range(0, 30)] 
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(run, d) for d in dates]

        for future in as_completed(futures):
            # process your records from each thread
            # process_records(future.result())


if __name__ == '__main__':
    main()
© www.soinside.com 2019 - 2024. All rights reserved.