我正在尝试在 scala 中使用 kafka 流。我想知道是否可以将
KStream
转换为 GlobalKTable
?
你不能将kstream直接转换为globalKtable,但你可以直接创建一个;请参阅 kstreams 了解更详细的基础知识。
用于创建全局Ktable;
val builder = new StreamsBuilder()
val globalMaterialized: Materialized[String, String, KeyValueStore[Bytes, Array[Byte]]] =
Materialized.as("global_store_name").withKeySerde(Serdes.String).withValueSerde(Serdes.String)
builder.globalTable("topic_name", globalMaterialized)
将 GlobalKtable 连接到处理器以访问它。