卡夫卡流跳窗顶N按维度划分

问题描述 投票:1回答:1

我有一个kafka流,我需要一个处理器,它可以实现以下功能。

使用一个45秒的跳转窗口和5秒的前进来计算基于域名对象的一个维度的前5名计数。例如,如果流会包含Clickstream数据,我需要按域名查看前5个urls,但也要在跳转窗口中进行窗口化处理。

比如我看到过做窗口统计的例子。

KStream<String, GenericRecord> pageViews = ...;

// Count page views per window, per user, with hopping windows of size 5 minutes that advance every 1 minute
KTable<Windowed<String>, Long> windowedPageViewCounts = pageViews
    .groupByKey(Grouped.with(Serdes.String(), genericAvroSerde))
    .windowedBy(TimeWindows.of(Duration.ofMinutes(5).advanceBy(Duration.ofMinutes(1))))
    .count()

比如说,在MusicExample上的Top n aggregations。

songPlayCounts.groupBy((song, plays) ->
            KeyValue.pair(TOP_FIVE_KEY,
                new SongPlayCount(song.getId(), plays)),
        Grouped.with(Serdes.String(), songPlayCountSerde))
        .aggregate(TopFiveSongs::new,
            (aggKey, value, aggregate) -> {
              aggregate.add(value);
              return aggregate;
            },
            (aggKey, value, aggregate) -> {
              aggregate.remove(value);
              return aggregate;
            },
            Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_STORE)
                .withKeySerde(Serdes.String())
                .withValueSerde(topFiveSerde)
        );

我只是觉得不能把这两者结合起来 -- 我同时得到窗口和Top n聚合。有什么想法吗?

apache-kafka apache-kafka-streams
1个回答
1
投票

一般来说是的,但是,对于非窗口的top-N聚合,算法将 始终 是一个近似值(不可能得到一个精确的结果,因为我们需要缓冲一下 一切 对于无界输入来说是不可能的)。) 然而,对于跳窗的情况,你会做一个精确的计算。

对于有窗口的情况,实际的聚合步骤,可以只是累积每个窗口的所有输入记录(例如,返回一个 List<V> 或其他一些集合)。) 在这个结果上 KTable 你申请 mapValues() 函数,得到 List<V> 每一个窗口(和键)的输入记录的数量,并能计算出你要找的实际top-N结果。

© www.soinside.com 2019 - 2024. All rights reserved.