我最近创建了我的第一个 Kafka 流应用程序用于学习。我使用了 spring-cloud-stream-kafka-binding。这是一个简单的电子商务系统,我在其中阅读一个名为产品的主题,每当有新产品库存进来时,该主题都会包含所有产品条目。我正在汇总数量以获得产品的总量。
我有两个选择 -
我选择了第二个选项,我发现应用程序自己创建了一个 kafka 主题,当我消费来自该主题的消息时,然后得到了聚合的消息。
.peek((k,v) -> LOGGER.info("Received product with key [{}] and value [{}]",k, v))
.groupByKey()
.aggregate(Product::new,
(key, value, aggregate) -> aggregate.process(value),
Materialized.<String, Product, KeyValueStore<Bytes, byte[]>>as(PRODUCT_AGGREGATE_STATE_STORE).withValueSerde(productEventSerde)//.withKeySerde(keySerde)
// because keySerde is configured in application.properties
);
使用 InteractiveQueryService,我可以在我的应用程序中访问此状态存储,以找出产品的可用总量。
现在有几个问题 -
我的应用程序的代码(其功能比我在这里解释的更多)可以从此链接访问-
内部主题称为变更日志主题,用于容错。聚合的状态使用
RocksDB
存储在本地磁盘上,并以变更日志主题的形式存储在 Kafka 代理上 - 这本质上是一个“备份”。如果任务被移动到新机器或本地状态由于其他原因丢失,Kafka Streams 可以通过从变更日志主题中读取对原始状态的所有更改并将其应用到新的 RocksDB 实例来恢复本地状态。恢复完成后(整个变更日志主题已处理),新机器上应该具有相同的状态,并且新机器可以在旧机器停止的地方继续处理。这有很多复杂的细节(例如,在默认设置中,当发生故障时,同一输入记录的状态可能会更新两次)。
另请参阅https://developer.confluence.io/learn-kafka/kafka-streams/stateful-fault-tolerance/