最近我一直在使用python 3.x在Ubuntu中玩芹菜和花(用于一台机器上的仪表板和任务可视化)。首先我安装了rabbitmq-server,radis,芹菜和花。然后我创建了一个名为tasks.py
的脚本,其中包含以下内容:
from celery import Celery
# py-advanced-message-queuing-protocol
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://localhost//')
@app.task
def intensive_sum1(num):
val = sum(x**4 for x in range(num))
return val
@app.task
def intensive_sum2(num):
val = sum(x**4 for x in range(num))
return val
@app.task
def intensive_sum3(num):
val = sum(x**4 for x in range(num))
return val
然后我创建了一个包含run.py
的脚本
from tasks import intensive_sum1, intensive_sum2, intensive_sum3
import time
start = time.time()
result1 = intensive_sum1.delay(100000000)
result2 = intensive_sum2.delay(100000000)
result3 = intensive_sum3.delay(100000000)
print(result1.get(), result2.get(), result3.get())
end = time.time()
print('time: ', end - start)
start = time.time()
result1 = sum(x**4 for x in range(100000000))
result2 = sum(x**4 for x in range(100000000))
result3 = sum(x**4 for x in range(100000000))
print(result1, result2, result3)
end = time.time()
print('time: ', end - start)
在运行后者之前,我启动了两个不同的终端,并将目录更改为两个脚本的位置。然后我在一个终端运行sudo celery -A tasks flower
,在另一个终端运行celery -A tasks worker --loglevel=info
。事实证明(惊喜)芹菜可以将每个任务分配到单个核心,从而节省大量时间。当然,这种节省时间只能用于大型功能,因为较小的功能会产生线程生成开销,这不会带来任何好处。
这让我想到了另一个问题。假设我有3台机器连接到同一台WIFI路由器而不是一台机器。我可以使用ifconfig
命令计算出每个Ubuntu机器的IP地址。让我们说其中一台机器是一台主机,它包含一个main.py
脚本,该脚本使用Opencv-Python捕获对象捕获实时图像。然后,它将每个图像,序列化并作为消息发送给两个工作机器。两台工作机都独立工作,并且都对同一图像进行反序列化。一个工人机器进行猫分类并返回猫的概率,另一个机器进行狗分类并返回狗的概率。一台工人机器可能需要更长时间才能得出结论。但是,对于该特定帧,主机需要等待两个分类结果,然后在该特定帧上覆盖一些结果。本能地,我被认为主机需要在前进之前检查这两个工作是否准备就绪(e.g. result_worker_one.ready() == result_worker_two.ready() == True
)。我怎样才能实现这种行为?如何在主机中序列化一个RGB图像并在工作机器中对其进行反序列化?什么backend
和broker
需要每台机器?如何将其设置为客户端服务器体系结构?
您在多台计算机上分配作业是正确的。事实上,这是芹菜的主要目的之一。
@app.task
def check_dog():
#dog_classification code
@app.task
def check_cat():
#cat classification code
您可以将这些任务组合在一起然后使用和弦(和弦是仅在组中的所有任务完成后执行的任务)在执行这两个功能后转到下一步。在下面显示的回调函数中包含两个任务后需要的内容。相关文档可以在这里找到:http://docs.celeryproject.org/en/master/userguide/canvas.html#groups
chord([check_dog(),check_cat()])(callback)
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://<username>:<password>@<ip of task queue host>')
确保为每个芹菜工作者提供任务文件,因为传递给工作人员的消息不包含源代码,而只包含任务名称本身。