我有一个仓库应用程序,在其中我需要按小时计算总库存。所有项目移动数据都发送到kafka流(添加/删除)。这意味着,我可以使用开窗的卡夫卡流来获取每小时的汇总运动,像这样]
sourceStream .mapValues((k, v) -> v.getType().equalsIgnoreCase("ADD") ? v.getQuantity() : -1 * v.getQuantity()) .groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1))) .reduce(Long::sum, Materialized.with(stringSerde, longSerde)).toStream().to("hourly-movement");
但是如何根据此汇总结果得到总库存?例如,对于该数据集,假设起始库存为零:
聚合流结果(按窗口)是这个:
我需要在前端创建小时图,这意味着我需要此数据集:
如何获得此类数据集?原始源流来自stream-logistic-movement
。
我有一个仓库应用程序,在其中我需要按小时计算总库存。所有项目移动数据都发送到kafka流(添加/删除)。这意味着,我可以每小时获取一次...
阅读不同类型的开窗技术可能很有用。在您的情况下,滑动时间窗可能是解决方案。在此处检查替代项:https://kafka.apache.org/25/documentation/streams/developer-guide/dsl-api.html#windowing