Faust 流媒体代理在空闲期后缓慢“唤醒”

问题描述 投票:0回答:2

我有一个管道,其中向主题 A 生成消息,该消息由流处理器处理并将丰富的数据发送到主题 B。 主题 B 由另外 3 个流处理器消耗,这些处理器独立执行一小部分计算(以减少单个处理器上的负载)并将其丰富的数据转发到新主题。最终处理器读取所有 3 个新主题,并通过 Web 套接字将此数据发送到 Web 客户端。 一切运行良好,但如果系统空闲 30 分钟左右且没有新消息,有时可能需要长达 10 秒才能到达管道末端。正常运行时,这个时间约为 10-20ms。

每个流处理器都使用表来引用以前的数据并确定如何丰富未来,所以我不确定如果随着时间的推移不需要访问该表,访问该表是否会变慢?

如果是这样,这似乎是一个愚蠢的解决方法,但可以使用计时器发送虚拟数据集来触发每个工作人员保持活动状态并保持警惕。

下面是从消息发起到到达管道末端的时间差的打印输出:

[2022-05-23 08:52:46,445] [10340] [WARNING] 0:00:00.017999
[2022-05-23 08:53:03,469] [10340] [WARNING] 0:00:00.025995
[2022-05-23 09:09:46,774] [10340] [WARNING] 0:00:06.179146

我想知道此页面上注明的经纪人或代理商可用的任何设置是否在这里有用?如果有人知道请告诉我。

更新

因此,我进行了测试,使用

@app.time
选项每秒通过整个管道发送一条虚拟/测试消息,并且从未出现过发送时间缓慢的情况。我还更新了工作方式,使用
@app.page()
装饰器直接与应用程序对话,而不是使用 FastAPI 端点来尝试发送到主题,这确实意味着我从未见过超过 2 秒的延迟。但同样的事情仍然会发生,如果它闲置了一段时间然后收到一条新消息,则几乎需要 2 秒(加上更改)才能完成它的操作。这确实开始看起来像是代理会限制其轮询,或者如果吞吐量较低,kafka 会限制代理的连接。

python faust
2个回答
1
投票

问题似乎源于 Kafka 上针对消费者和生产者的设置,如果他们没有在指定时间范围内发送/消费消息,该设置基本上会关闭连接。

在 Faust 中,您可以在定义应用程序时访问它并进行设置,如下所示:

app.conf.producer_connections_max_idle_ms
app.conf.consumer_connections_max_idle_ms

并将其设置为适当的值。我知道这个设置可能会保持较低(默认为 9 分钟),以便大型动态集群释放资源或内存(或其他东西),但在我们的用例中,小型集群在架构和设计方面将保持静态,但它不是(我认为)将其从 9 分钟增加到 12 或 24 小时是一个问题。


0
投票

我认为问题在于主 API 请求服务器并未设置为真正的 faust 应用程序,而是设置为 FastAPI 应用程序,因此我在第一次运行服务器时创建了一个 faust App 对象,然后通过该对象在API 请求,因此它可能需要时不时地经历一些连接过程,这将由一些需要发送到 kafka 服务的请求触发。在定义的时间段后,将根据某些随机请求触发延迟。 解决方案是将请求直接发送到处理此类数据的faust应用程序,或者设置一个真正的faust应用程序来处理所有API请求。

© www.soinside.com 2019 - 2024. All rights reserved.