我正在构建拓扑,并希望使用KStream.process()将一些中间值写入数据库。此步骤不会更改数据的性质,并且完全是无状态的。
添加Processor需要创建一个ProcessorSupplier并将此实例与状态存储的名称一起传递给KStream.process()
函数。这是我不明白的。
如何将StateStore对象添加到拓扑中,因为它需要StateStoreSupplier?
在应用程序启动时,未能添加所述StateStore
会出现此错误:
线程“main”中的异常org.apache.kafka.streams.errors.TopologyBuilderException:无效的拓扑构建:尚未添加StateStore my-state-store。
为什么处理器需要有状态存储?对于无状态且不维护状态的处理器来说,这似乎是可选的。
通过应用处理器来处理此流中的所有元素,一次一个元素。
这是一张来自simple example on how to use state stores的Confluent Platform documentation on Kafka Streams。
第1步:定义StateStore
/ StateStoreSupplier
:
StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();
- 我没有看到将StateStore对象添加到拓扑的方法。它也需要StateStoreSupplier。
步骤2:将状态存储添加到拓扑中。
选项A - 使用Processor API时:
TopologyBuilder builder = new TopologyBuilder();
// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
.addProcessor("Process", () -> new WordCountProcessor(), "Source")
// Add the countStore associated with the WordCountProcessor processor
.addStateStore(countStore, "Process")
.addSink("Sink", "sink-topic", "Process");
选项B - 使用Kafka Streams DSL时:
在这里,您需要调用KStreamBuilder#addStateStore("name-of-your-store")
将状态存储添加到处理器拓扑中。然后,在调用KStream#process()
或KStream#transform()
等方法时,还必须传入状态存储的名称 - 否则您的应用程序将在运行时失败。
以KStream#transform()
为例:
KStreamBuilder builder = new KStreamBuilder();
// Add the countStore that will be used within the Transformer[Supplier]
// that we pass into `transform()` below.
builder.addStateStore(countStore);
KStream<byte[], String> input = builder.stream("source-topic");
KStream<String, Long> transformed =
input.transform(/* your TransformerSupplier */, countStore.name());
为什么处理器需要有状态存储?对于无状态且不维护状态的处理器来说,这似乎是可选的。
你是对的 - 如果处理器没有维持状态,你就不需要状态存储。
使用DSL时,您只需调用KStreamBuilder#addStateStore("name-of-your-store")
将状态存储添加到处理器拓扑中,稍后再引用它。