我是 faust 新手,想要跟踪过去 n 秒内收到的消息数量,并每 l 秒更新一次。我认为跳频表适合这个目的。
为了测试,我每秒向我的主题生成一条消息,并期望以下代码(n = 5 和 l = 1)应从 1 向上“计数”到 5,然后如果消息不断传入,则保持在 5:
hopping_table = app.Table("hopping_table", default = int)\
.hopping(5, 1)
@app.agent(topic)
async def process(stream):
async for value in stream:
hopping_table["sum"] += 1
print(f"sum: {hopping_table['sum'].value()}")
但结果我得到:
[2024-02-04 22:50:55,092] [20956] [WARNING] sum: 1
[2024-02-04 22:50:56,094] [20956] [WARNING] sum: 1
[2024-02-04 22:50:57,096] [20956] [WARNING] sum: 1
[2024-02-04 22:50:58,098] [20956] [WARNING] sum: 1
[2024-02-04 22:50:59,099] [20956] [WARNING] sum: 1
[2024-02-04 22:51:00,101] [20956] [WARNING] sum: 1
我可以通过使用跳跃窗口来实现我想要做的事情吗?还是我的理解总体上是错误的?可惜我没有找到太多关于faust滑动窗的具体信息
提前致谢
我也遇到过这种跳窗行为。我在文章使用 Python、Kafka 和 Faust 进行流处理中找到了唯一实际的答案。我完全同意文章作者的观点
windowed tables which are inadequately explained in the Faust documentation and often lead to confusion
。
作者使用函数来更新跳跃窗口表:
async def update_table(events, key, window_wrapper, inner_table):
t = window_wrapper.get_timestamp()
for window_range in inner_table._window_ranges(t):
prev = inner_table[(key, window_range)]
prev.extend(events)
inner_table[(key, window_range)] = prev