我有一个使用spring-cloud-stream-stream库的kafka流处理应用程序。该应用程序利用3个application.id值来收听3个主题。对于这些输入主题中的2个,在处理完数据之后,我将消息推送到各个输出主题上,然后使用它们来创建GlobalKTables,如下所示:
streamsBuilder.globalTable(firstSSTopic, Consumed.with(Serdes.String(), Serdes.String()),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>
as("ss-1")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
streamsBuilder.globalTable(secondSSTopic, Consumed.with(Serdes.String(), Serdes.String()),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>
as("ss-2")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
因此,问题是,它将使用哪个application.id来消耗“ firstSSTopic”和“ secondSSTopic”中的数据?还是仅仅是GlobalStreamThread作为没有任何组的独立使用者?当我检查默认状态目录(tmp / kafka-streams)时,可以看到所有3个application.id目录下的两个全局状态存储的sst和日志文件。我如何避免这种情况?因为这将占用3倍的磁盘空间,并有可能导致存储空间快速填充。
[GlobalKTable
只能用作流表联接的右侧输入。
但是不明白为什么它会将相同的数据持久保存在同一应用程序实例中的多个位置
这提供了使用GlobalKTable
执行联接的能力,而不必重新划分输入流。
如何避免这种情况?
您无法使用KStream
来避免这种情况。