我有一个kafka流,我需要一个处理器,它可以实现以下功能。
使用一个45秒的跳转窗口和5秒的前进来计算基于域名对象的一个维度的前5名计数。例如,如果流会包含Clickstream数据,我需要按域名查看前5个urls,但也要在跳转窗口中进行窗口化处理。
比如我看到过做窗口统计的例子。
KStream<String, GenericRecord> pageViews = ...;
// Count page views per window, per user, with hopping windows of size 5 minutes that advance every 1 minute
KTable<Windowed<String>, Long> windowedPageViewCounts = pageViews
.groupByKey(Grouped.with(Serdes.String(), genericAvroSerde))
.windowedBy(TimeWindows.of(Duration.ofMinutes(5).advanceBy(Duration.ofMinutes(1))))
.count()
比如说,在MusicExample上的Top n aggregations。
songPlayCounts.groupBy((song, plays) ->
KeyValue.pair(TOP_FIVE_KEY,
new SongPlayCount(song.getId(), plays)),
Grouped.with(Serdes.String(), songPlayCountSerde))
.aggregate(TopFiveSongs::new,
(aggKey, value, aggregate) -> {
aggregate.add(value);
return aggregate;
},
(aggKey, value, aggregate) -> {
aggregate.remove(value);
return aggregate;
},
Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(topFiveSerde)
);
我只是觉得不能把这两者结合起来 -- 我同时得到窗口和Top n聚合。有什么想法吗?
一般来说是的,但是,对于非窗口的top-N聚合,算法将 始终 是一个近似值(不可能得到一个精确的结果,因为我们需要缓冲一下 一切 对于无界输入来说是不可能的)。) 然而,对于跳窗的情况,你会做一个精确的计算。
对于有窗口的情况,实际的聚合步骤,可以只是累积每个窗口的所有输入记录(例如,返回一个 List<V>
或其他一些集合)。) 在这个结果上 KTable
你申请 mapValues()
函数,得到 List<V>
每一个窗口(和键)的输入记录的数量,并能计算出你要找的实际top-N结果。