kafka Streams groupBy 内部做了什么?

问题描述 投票:0回答:1

假设有一个主题,其中不同文件的块全部混合在一起,由一个元组表示

(FileId, Chunk)

同一文件的块也可能有点乱序。

任务是聚合所有文件并将它们存储到某个存储中。

文件数量未绑定。

在伪流 DSL 中可能看起来像这样

topic('chunks')
    .groupByKey((fileId, chunk) -> fileId)
    .sortBy((fileId, chunk) -> chunk.offset)
    .aggregate((fileId, chunk) -> store.append(fileId, chunk));

我想了解kafka Streams是否可以有效地解决这个问题。 由于文件数量不受限制,kafka 将如何管理 groupBy 操作的中间主题?它将使用多少个分区等?在文档中找不到此详细信息。 另外,假设块有一个指示 EOF 的标志。如何表明特定组将不再有任何新数据?

apache-kafka apache-kafka-streams
1个回答
0
投票

为了完成这项工作,您确实需要一些“EOF”信息,或者知道每个文件的垃圾数量。否则,当接收到所有块时,信息不足。

正如 @Ayoub 在评论中提到的,虽然没有可用的“排序”操作,因此,您需要手动实现排序。另外,考虑到块的数量可能会有所不同,实际上最好的办法可能是不使用 DSL,而是实现带有附加状态存储的自定义

Processor
,以便为您提供完全控制。

builder.stream("topic") .selectKey((fileId, chunk) -> fileId) .repartition() .process(...) .to("result-topic");
默认情况下,

repartition

 步骤将使用与上游输入主题相同数量的分区,但您也可以传入 
Repartitioned
 配置对象并显式设置分区数量。

要将状态存储添加到您的

Processor

,您可以在 
ConnectedStoreProvider#stores()
 中覆盖 
ProcessorSupplier
 方法。

Processor

内,您可以根据需要收集/排序文件块,并最终在重新组装文件后从状态存储中删除。

© www.soinside.com 2019 - 2024. All rights reserved.