具有多个连接的Rq Worker

问题描述 投票:0回答:1

我在同一网络中有 3 台服务器。每台服务器上都运行着一个 redis 服务和某种生产者。生产者将作业排入名为

tasks
的本地 rq 队列。 所以每个服务器都有自己的
tasks
队列。

此外,还有一台服务器正在运行 rq 工作线程。是否可以让该工作人员检查 3 台服务器中每台服务器上的

tasks
队列?

我尝试创建连接列表

import redis
from rq import Queue, Worker
from rq import push_connection
# urls = [url1, url2, url3]
connections = list(map(redis.from_url, urls))

然后我用它来创建队列列表。

queues = list(map(lambda c: Queue('tasks', connection=c), connections))

然后我推送所有连接

for connection in connections:
    push_connection(connection)

并将队列传递给

Worker

Worker(queues=queues).work()

这会导致工作人员仅侦听

tasks
最后推送的任何连接。

我一直在研究 rq 上的代码,我想我可以编写一个自定义工作类来执行此操作,但在我这样做之前我想问是否还有其他方法。也许甚至是另一个完全的队列框架?

python redis distributed-computing
1个回答
2
投票

好的,我解决了问题。我仍然不确定我是否有权在这里发布实际的源代码,所以我将概述我的解决方案。

我必须覆盖

register_birth(self)
register_death(self)
dequeue_job_and_maintain_ttl(self, timeout)
。这些函数的原始实现可以在here找到。

register_birth(self) -> None

基本上,您必须迭代所有连接,

push_connection(connection)
,完成注册过程,然后
pop_connection()

请注意,仅在

mapping
变量中列出与该连接对应的队列。原始实现使用
queue_names(self)
来获取队列名称列表。您必须执行与
queue_names(self)
相同的操作,但仅限于相关队列。

register_death(self) -> None

本质上与

register_birth
相同。迭代所有连接,
push_connection(connection)
,完成与原始实现相同的步骤,然后
pop_connection()

dequeue_job_and_maintain_ttl(self, timeout: Optional[int]) -> Tuple[Job, Queue]

让我们看一下这个函数的原始实现。我们希望保持一切不变,直到到达

try
块。在这里,我们想要无休止地迭代所有连接。您可以使用 itertools.cycle 来完成此操作。

在循环

push_connection(connection)
内,并将
self.connection
设置为当前连接。如果缺少
self.connection = connection
,作业结果可能无法正确返回。

现在我们将继续调用类似于原始实现的

self.queue_class.dequeue_any
。但我们会将超时设置为
1
,这样如果当前连接没有工作人员的任何工作,我们就可以继续检查另一个连接。

确保使用与当前连接对应的队列列表调用

self.queue_class.dequeue_any
。在这种情况下,
queues
仅包含相关队列。

result = self.queue_class.dequeue_any(
    queues, 1, connection=connection, job_class=self.job_class)

然后

pop_connection()
,并对
result
进行与原始实现相同的检查。如果
result
不是
None
,我们已经找到了一份工作要做,需要
break
退出循环。

保留原始实现中的其他所有内容。不要忘记

break
块末尾的
try
。它打破了
while True
循环。

还有一件事

队列包含对其连接的引用。您可以使用它来创建

(connection, queues)
列表,其中
queues
包含具有连接
connection
的所有队列。

如果将结果列表传递给itertools.cycle,您将获得覆盖

dequeue_job_and_maintain_ttl
所需的无限迭代器。

更新(2023-12-12)

heartbeat(timeout: Optional[int] = None, pipeline: Optional[Pipeline] = None) -> None

最近,我必须解决与心跳相关的问题,并且必须覆盖

heartbeat
。如果给定的
pipeline
参数不是 None 则只需调用父实现。否则,调用
self.connections = {q.connection for q in self.queues}
中所有连接的父实现。

maintain_heartbeats(job: Job)

当工人等待马完成任务时,会调用此函数。它只处理一个连接,因此我们将扩展其功能以处理多个连接。同样,我们迭代所有连接并实现与父级几乎相同的行为。在

maintain_heartbeats
的父实现中,执行添加到管道的操作的结果预计至少有 3 个元素。当我们检查的连接与给定作业的连接不同时,情况并非如此。因此,在访问数组之前,我们必须检查当前正在查看的连接是否与作业的连接相同。
Job
类具有
connection
属性。

set_state(self, state: str, pipeline: Optional[Pipeline] = None)

为每个连接调用父实现。这应该确保每个连接上工作线程的状态都是正确的。

© www.soinside.com 2019 - 2024. All rights reserved.