如何从Kafka Stream变更日志主题读取数据?

问题描述 投票:0回答:1

我使用 Kafka Stream 来处理我的主题 A,并使用 inMemoryKeyValueStore。

builder.addStateStore(Stores.keyValueStoreBuilder(
           //Stores.persistentKeyValueStore("AccurateADCounts"),
            Stores.inMemoryKeyValueStore("AccurateADCounts"),
            Serdes.String(),
            Serdes.Integer()),
            "AccurateADProcess");

我跟踪 Kafka Stream 将状态记录回主题

${application.id}-${state-store-name}-changelog

例如

shunwang-streams-filebeat-test-AccurateADCounts-changelog

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "shunwang-streams-filebeat-test");

为了使状态存储具有容错能力并允许状态存储迁移而不会丢失数据,状态存储可以在后台持续备份到 Kafka 主题。

我尝试使用该主题,它返回一些不可读的数据。

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic shunwang-streams-filebeat-test-AccurateADCounts-changelog --from-beginning --new-consumer

返回一些类似的东西:

Xshell    null

问题:

有什么方法可以正确阅读主题

${application.id}-${state-store-name}-changelog

我知道这个主题是针对 Kafka Stream 的,但我仍然想找到一些方法来做到这一点。

apache-kafka apache-kafka-streams
1个回答
0
投票

如果变更日志主题是长类型,您可以使用

--value-deserializer
命令中的
kafka-console-consumer
来读取。例如:

kafka-console-consumer \
--bootstrap-server localhost:29092 \ 
--topic kafka-streams-101-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog \
--property "print.key=true"  \
--value-deserializer "org.apache.kafka.common.serialization.LongDeserializer" \
--from-beginning
© www.soinside.com 2019 - 2024. All rights reserved.