使用Celery和Python将输入和图像处理作业发送到多台机器

问题描述 投票:1回答:1

最近我一直在使用python 3.x在Ubuntu中玩芹菜和花(用于一台机器上的仪表板和任务可视化)。首先我安装了rabbitmq-server,radis,芹菜和花。然后我创建了一个名为tasks.py的脚本,其中包含以下内容:

from celery import Celery

# py-advanced-message-queuing-protocol
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://localhost//')

@app.task
def intensive_sum1(num):
    val = sum(x**4 for x in range(num))
    return val


@app.task
def intensive_sum2(num):
    val = sum(x**4 for x in range(num))
    return val

@app.task
def intensive_sum3(num):
    val = sum(x**4 for x in range(num))
    return val

然后我创建了一个包含run.py的脚本

from tasks import intensive_sum1, intensive_sum2, intensive_sum3
import time

start = time.time()
result1 = intensive_sum1.delay(100000000)
result2 = intensive_sum2.delay(100000000)
result3 = intensive_sum3.delay(100000000)
print(result1.get(), result2.get(), result3.get())
end = time.time()
print('time: ', end - start)

start = time.time()
result1 = sum(x**4 for x in range(100000000))
result2 = sum(x**4 for x in range(100000000))
result3 = sum(x**4 for x in range(100000000))
print(result1, result2, result3)
end = time.time()
print('time: ', end - start)

在运行后者之前,我启动了两个不同的终端,并将目录更改为两个脚本的位置。然后我在一个终端运行sudo celery -A tasks flower,在另一个终端运行celery -A tasks worker --loglevel=info。事实证明(惊喜)芹菜可以将每个任务分配到单个核心,从而节省大量时间。当然,这种节省时间只能用于大型功能,因为较小的功能会产生线程生成开销,这不会带来任何好处。

这让我想到了另一个问题。假设我有3台机器连接到同一台WIFI路由器而不是一台机器。我可以使用ifconfig命令计算出每个Ubuntu机器的IP地址。让我们说其中一台机器是一台主机,它包含一个main.py脚本,该脚本使用Opencv-Python捕获对象捕获实时图像。然后,它将每个图像,序列化并作为消息发送给两个工作机器。两台工作机都独立工作,并且都对同一图像进行反序列化。一个工人机器进行猫分类并返回猫的概率,另一个机器进行狗分类并返回狗的概率。一台工人机器可能需要更长时间才能得出结论。但是,对于该特定帧,主机需要等待两个分类结果,然后在该特定帧上覆盖一些结果。本能地,我被认为主机需要在前进之前检查这两个工作是否准备就绪(e.g. result_worker_one.ready() == result_worker_two.ready() == True)。我怎样才能实现这种行为?如何在主机中序列化一个RGB图像并在工作机器中对其进行反序列化?什么backendbroker需要每台机器?如何将其设置为客户端服务器体系结构?

python multithreading opencv celery flower
1个回答
2
投票

您在多台计算机上分配作业是正确的。事实上,这是芹菜的主要目的之一。

  1. 要检查两个异步作业是否已完成,您可以在芹菜中使用“组和和弦”选项。假设您的两个芹菜任务如下: @app.task def check_dog(): #dog_classification code @app.task def check_cat(): #cat classification code 您可以将这些任务组合在一起然后使用和弦(和弦是仅在组中的所有任务完成后执行的任务)在执行这两个功能后转到下一步。在下面显示的回调函数中包含两个任务后需要的内容。相关文档可以在这里找到:http://docs.celeryproject.org/en/master/userguide/canvas.html#groups chord([check_dog(),check_cat()])(callback)
  2. 看一下图像序列化部分:Passing an image to a celery task
  3. 为了回答问题的第3部分,Celery固有地遵循客户端服务器架构来支持并行计算。每当你调用芹菜任务时,它都会在你设置的消息代理上放置一条消息(在你的情况下你使用了rabbitMQ)。此消息将包含有关要运行的任务以及所有必需参数的信息。消息队列将向不同计算机上的Celery工作人员发送消息。一旦工作人员收到消息,工作人员将执行消息描述的任务。因此,如果要在多台计算机之间分配任务,您只需在每台机器中启动一个芹菜工作者,该机器会监听主机中的消息队列。您可以按如下方式配置工作程序 app = Celery('tasks', backend='redis://localhost', broker='pyamqp://<username>:<password>@<ip of task queue host>') 确保为每个芹菜工作者提供任务文件,因为传递给工作人员的消息不包含源代码,而只包含任务名称本身。
© www.soinside.com 2019 - 2024. All rights reserved.