使用芹菜的多重处理,等待线程无法收到通知信号。

问题描述 投票:0回答:1

当我使用芹菜多处理时,等待线程无法接收到通知信号!

但当我用脚本运行代码时,却能正常运行。

这个问题是芹菜对多线程支持不好造成的吗?

如果能解决这个问题,请给我一个提示,谢谢!

# tasks.py

@shared_task(bind=True)
def long_time_def(self, *args, **kwargs):
    tp = ThreadPool(5)
    tp.set_tasks(pv.check_position_effective, list(args))
    res = tp.final_results()
    while len(res) < len(args):
        print(res)
    return 'finished'
# ../public/tools.py

class ThreadPool:
    def __init__(self, max_thread_num=5):
        self.over = False
        self.results = []

        self.func = None
        self.args_list = None
        self.task_num = 0
        self.max_thread_num = max_thread_num

        self.pool = ThreadPoolExecutor(max_workers=max_thread_num)
        self.cond = threading.Condition()

    def set_tasks(self, func, args_list):
        self.task_num = len(args_list)
        self.args_list = args_list
        self.func = func


    def get_result(self, future):
        self.results.append(future.result())
        if len(self.args_list):
            args = self.args_list.pop()
            task = self.pool.submit(self.func, *args)
            task.add_done_callback(self.get_result)
        else:
            print("result:%s"%self.results)
            while self.task_num != len(self.results):
                print(self.results)
                time.sleep(1)
            print('\n', 'finish')
            self.cond.acquire()

            ############ this place ############
            self.cond.notify()
            ############ this place ############

            self.cond.release()
            return

    def _start_tasks(self):
        for i in range(self.max_thread_num):

            if len(self.args_list):
                args = self.args_list.pop()
                task = self.pool.submit(self.func, *args)
                task.add_done_callback(self.get_result)
            else:
                break

    def final_results(self):
        self._start_tasks()
        if self.task_num == len(self.results):
            return self.results
        else:
            # print("main locked")
            # self.cond.acquire()

            ############ this place ############
            print("main waiting")
            self.cond.wait()
            ############ this place ############

            # print("main released")
            # self.cond.release()
            print("main finished")
            return self.results

输出

[2020-04-27 20:53:13,962: INFOMainProcess] 收到任务: position.tasks.long_time_def[****-****] 。

[2020-04-27 20:53:13,991: WARNINGMainProcess] 主等待

[2020-04-27 20:53:29,091: WARNINGMainProcess] result:[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] 。

[2020-04-27 20:53:29,092: WARNINGMainProcess] [1,1,1,1,1,1,1,1,1,1]

[2020-04-27 20:53:30,145: WARNINGMainProcess] [1,1,1,1,1,1,1,1,1,1]

[2020-04-27 20:53:30,155: WARNINGMainProcess] result:[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] 。

[2020-04-27 20:53:30,156: WARNINGMainProcess] finish

python django celery python-multithreading
1个回答
0
投票

芹菜使用 台球 用于流程管理,所以暂时只能用Billiard代替多处理。

© www.soinside.com 2019 - 2024. All rights reserved.