我使用 Kafka Stream 来处理我的主题 A,并使用 inMemoryKeyValueStore。
builder.addStateStore(Stores.keyValueStoreBuilder(
//Stores.persistentKeyValueStore("AccurateADCounts"),
Stores.inMemoryKeyValueStore("AccurateADCounts"),
Serdes.String(),
Serdes.Integer()),
"AccurateADProcess");
我跟踪 Kafka Stream 将状态记录回主题
${application.id}-${state-store-name}-changelog
例如
shunwang-streams-filebeat-test-AccurateADCounts-changelog
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "shunwang-streams-filebeat-test");
为了使状态存储具有容错能力并允许状态存储迁移而不会丢失数据,状态存储可以在后台持续备份到 Kafka 主题。
我尝试使用该主题,它返回一些不可读的数据。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic shunwang-streams-filebeat-test-AccurateADCounts-changelog --from-beginning --new-consumer
返回一些类似的东西:
Xshell null
问题:
有什么方法可以正确阅读主题
${application.id}-${state-store-name}-changelog
?
我知道这个主题是针对 Kafka Stream 的,但我仍然想找到一些方法来做到这一点。
如果变更日志主题是长类型,您可以使用
--value-deserializer
命令中的 kafka-console-consumer
来读取。例如:
kafka-console-consumer \
--bootstrap-server localhost:29092 \
--topic kafka-streams-101-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog \
--property "print.key=true" \
--value-deserializer "org.apache.kafka.common.serialization.LongDeserializer" \
--from-beginning