当我使用芹菜多处理时,等待线程无法接收到通知信号!
但当我用脚本运行代码时,却能正常运行。
这个问题是芹菜对多线程支持不好造成的吗?
如果能解决这个问题,请给我一个提示,谢谢!
# tasks.py
@shared_task(bind=True)
def long_time_def(self, *args, **kwargs):
tp = ThreadPool(5)
tp.set_tasks(pv.check_position_effective, list(args))
res = tp.final_results()
while len(res) < len(args):
print(res)
return 'finished'
# ../public/tools.py
class ThreadPool:
def __init__(self, max_thread_num=5):
self.over = False
self.results = []
self.func = None
self.args_list = None
self.task_num = 0
self.max_thread_num = max_thread_num
self.pool = ThreadPoolExecutor(max_workers=max_thread_num)
self.cond = threading.Condition()
def set_tasks(self, func, args_list):
self.task_num = len(args_list)
self.args_list = args_list
self.func = func
def get_result(self, future):
self.results.append(future.result())
if len(self.args_list):
args = self.args_list.pop()
task = self.pool.submit(self.func, *args)
task.add_done_callback(self.get_result)
else:
print("result:%s"%self.results)
while self.task_num != len(self.results):
print(self.results)
time.sleep(1)
print('\n', 'finish')
self.cond.acquire()
############ this place ############
self.cond.notify()
############ this place ############
self.cond.release()
return
def _start_tasks(self):
for i in range(self.max_thread_num):
if len(self.args_list):
args = self.args_list.pop()
task = self.pool.submit(self.func, *args)
task.add_done_callback(self.get_result)
else:
break
def final_results(self):
self._start_tasks()
if self.task_num == len(self.results):
return self.results
else:
# print("main locked")
# self.cond.acquire()
############ this place ############
print("main waiting")
self.cond.wait()
############ this place ############
# print("main released")
# self.cond.release()
print("main finished")
return self.results
[2020-04-27 20:53:13,962: INFOMainProcess] 收到任务: position.tasks.long_time_def[****-****] 。
[2020-04-27 20:53:13,991: WARNINGMainProcess] 主等待
[2020-04-27 20:53:29,091: WARNINGMainProcess] result:[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] 。
[2020-04-27 20:53:29,092: WARNINGMainProcess] [1,1,1,1,1,1,1,1,1,1]
[2020-04-27 20:53:30,145: WARNINGMainProcess] [1,1,1,1,1,1,1,1,1,1]
[2020-04-27 20:53:30,155: WARNINGMainProcess] result:[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] 。
[2020-04-27 20:53:30,156: WARNINGMainProcess] finish
芹菜使用 台球 用于流程管理,所以暂时只能用Billiard代替多处理。