使用pool.imap时无法腌制psycopg2.extensions.connection对象,但可以在单个进程中完成

问题描述 投票:1回答:1

我正在尝试构建一个应用程序,该应用程序将“检出”一个单元,该单元是覆盖地理数据库中一部分土地的正方形,并对该单元中的要素进行分析。由于我要处理的单元很多,因此我使用的是多处理方法。

我使它在对象内部有点像这样:

class DistributedGeographicConstraintProcessor:

    ...

    def _process_cell(self, conn_string):

        conn = pg2.connect(conn_string)
        try:
            cur = conn.cursor()

            cell_id = self._check_out_cell(cur)
            conn.commit()
            print(f"processing cell_id {cell_id}...")

            for constraint in self.constraints:
                # print(f"processing {constraint.name()}...")
                query = constraint.prepare_distributed_query(self.job, self.grid)
                cur.execute(query, {
                    "buffer": constraint.buffer(),
                    "cell_id": cell_id,
                    "name": constraint.name(),
                    "simplify_tolerance": constraint.simplify_tolerance()
                })

            # TODO: do a final race condition check to further suppress duplicates
            self._check_in_cell(cur, cell_id)
            conn.commit()

        finally:
            del cur
            conn.close()

        return None

    def run(self):

        while True:
            if not self._job_finished():
                params = [self.conn_string] * self.num_cores
                processes = []
                for param in params:
                    process = mp.Process(target=self._process_cell, args=(param,))
                    processes.append(process)
                    sleep(0.1)  # Prevent multiple processes from checkout out the same grid square
                    process.start()
                for process in processes:
                    process.join()
            else:
                self._finalize_job()
                break

但是问题是,它将仅启动四个进程,并等到它们全部完成后再启动四个新进程。

我想这样做,以便当一个进程完成其工作时,它将立即开始在下一个单元格上工作,即使其协同进程尚未完成。

我不确定如何实现此功能,并且我已经尝试过使用这样的池:

def run(self):

    pool = mp.Pool(self.num_cores)
    unprocessed_cells = self._unprocessed_cells()
    for i in pool.imap(self._process_cell, unprocessed_cells):
        print(i)

但是这只是告诉我无法腌制连接:

TypeError: can't pickle psycopg2.extensions.connection objects

但是我不明白为什么,因为它与我在imap目标中使用的功能完全相同。

我已经看过这些线程,这就是为什么它们不回答我的问题:

  • Process-这里的答案仅表示多个进程无法共享同一连接。我知道这一点,并且正在初始化子进程中正在执行的函数中的进程。另外,正如我提到的,当我将函数映射到单个Error Connecting To PostgreSQL can't pickle psycopg2.extensions.connection objects实例时,它也可以使用相同的函数和相同的输入。
  • Process-该问题没有答案,也没有任何注释,代码也不是完整的-作者引用的是问题中未指定的函数,无论如何,显然它们是公然地尝试在进程之间共享相同的游标。

我正在尝试构建一个应用程序,该应用程序将“检出”一个单元,该单元是覆盖地理数据库中一部分土地的正方形,并对该单元中的要素进行分析。由于我...

python multiprocessing psycopg2 multiprocess
1个回答
0
投票

我的猜测是您要在Multiprocessing result of a psycopg2 request. “Can't pickle psycopg2.extensions.connection objects”上附加一些连接对象;尝试仅使用函数(无类/方法)重写您的解决方案。

© www.soinside.com 2019 - 2024. All rights reserved.