Python Faust 跳窗表

问题描述 投票:0回答:1

我是 faust 新手,想要跟踪过去 n 秒内收到的消息数量,并每 l 秒更新一次。我认为跳频表适合这个目的。

为了测试,我每秒向我的主题生成一条消息,并期望以下代码(n = 5 和 l = 1)应从 1 向上“计数”到 5,然后如果消息不断传入,则保持在 5:

hopping_table = app.Table("hopping_table", default = int)\
    .hopping(5, 1)


@app.agent(topic)
async def process(stream):
    async for value in stream:
        hopping_table["sum"] += 1
        print(f"sum: {hopping_table['sum'].value()}")

但结果我得到:

[2024-02-04 22:50:55,092] [20956] [WARNING] sum: 1 
[2024-02-04 22:50:56,094] [20956] [WARNING] sum: 1 
[2024-02-04 22:50:57,096] [20956] [WARNING] sum: 1 
[2024-02-04 22:50:58,098] [20956] [WARNING] sum: 1 
[2024-02-04 22:50:59,099] [20956] [WARNING] sum: 1 
[2024-02-04 22:51:00,101] [20956] [WARNING] sum: 1

我可以通过使用跳跃窗口来实现我想要做的事情吗?还是我的理解总体上是错误的?可惜我没有找到太多关于faust滑动窗的具体信息

提前致谢

python faust
1个回答
-1
投票

我也遇到过这种跳窗行为。我在文章使用 Python、Kafka 和 Faust 进行流处理中找到了唯一实际的答案。我完全同意文章作者的观点

windowed tables which are inadequately explained in the Faust documentation and often lead to confusion

作者使用函数来更新跳跃窗口表:

async def update_table(events, key, window_wrapper, inner_table):
    t = window_wrapper.get_timestamp()
    for window_range in inner_table._window_ranges(t): 
        prev = inner_table[(key, window_range)]
        prev.extend(events)
        inner_table[(key, window_range)] = prev
© www.soinside.com 2019 - 2024. All rights reserved.