假设有一个主题,其中不同文件的块全部混合在一起,由一个元组表示
(FileId, Chunk)
。
同一文件的块也可能有点乱序。
任务是聚合所有文件并将它们存储到某个存储中。
文件数量未绑定。
在伪流 DSL 中可能看起来像这样
topic('chunks')
.groupByKey((fileId, chunk) -> fileId)
.sortBy((fileId, chunk) -> chunk.offset)
.aggregate((fileId, chunk) -> store.append(fileId, chunk));
我想了解kafka Streams是否可以有效地解决这个问题。 由于文件数量不受限制,kafka 将如何管理 groupBy 操作的中间主题?它将使用多少个分区等?在文档中找不到此详细信息。 另外,假设块有一个指示 EOF 的标志。如何表明特定组将不再有任何新数据?
为了完成这项工作,您确实需要一些“EOF”信息,或者知道每个文件的垃圾数量。否则,当接收到所有块时,信息不足。
正如 @Ayoub 在评论中提到的,虽然没有可用的“排序”操作,因此,您需要手动实现排序。另外,考虑到块的数量可能会有所不同,实际上最好的办法可能是不使用 DSL,而是实现带有附加状态存储的自定义
Processor
,以便为您提供完全控制。
builder.stream("topic")
.selectKey((fileId, chunk) -> fileId)
.repartition()
.process(...)
.to("result-topic");
默认情况下,repartition
步骤将使用与上游输入主题相同数量的分区,但您也可以传入
Repartitioned
配置对象并显式设置分区数量。要将状态存储添加到您的
Processor
,您可以在
ConnectedStoreProvider#stores()
中覆盖
ProcessorSupplier
方法。在
Processor
内,您可以根据需要收集/排序文件块,并最终在重新组装文件后从状态存储中删除。