如何注册无状态处理器(似乎也需要StateStore)?

问题描述 投票:5回答:1

我正在构建拓扑,并希望使用KStream.process()将一些中间值写入数据库。此步骤不会更改数据的性质,并且完全是无状态的。

添加Processor需要创建一个ProcessorSupplier并将此实例与状态存储的名称一起传递给KStream.process()函数。这是我不明白的。

如何将StateStore对象添加到拓扑中,因为它需要StateStoreSupplier

在应用程序启动时,未能添加所述StateStore会出现此错误:

线程“main”中的异常org.apache.kafka.streams.errors.TopologyBuilderException:无效的拓扑构建:尚未添加StateStore my-state-store。

为什么处理器需要有状态存储?对于无状态且不维护状态的处理器来说,这似乎是可选的。

通过应用处理器来处理此流中的所有元素,一次一个元素。

java apache-kafka-streams
1个回答
12
投票

这是一张来自simple example on how to use state storesConfluent Platform documentation on Kafka Streams

第1步:定义StateStore / StateStoreSupplier

StateStoreSupplier countStore = Stores.create("Counts")
                                      .withKeys(Serdes.String())
                                      .withValues(Serdes.Long())
                                      .persistent()
                                      .build();
  1. 我没有看到将StateStore对象添加到拓扑的方法。它也需要StateStoreSupplier。

步骤2:将状态存储添加到拓扑中。

选项A - 使用Processor API时:

TopologyBuilder builder = new TopologyBuilder();

// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
       .addProcessor("Process", () -> new WordCountProcessor(), "Source")
       // Add the countStore associated with the WordCountProcessor processor
       .addStateStore(countStore, "Process")
       .addSink("Sink", "sink-topic", "Process");

选项B - 使用Kafka Streams DSL时:

在这里,您需要调用KStreamBuilder#addStateStore("name-of-your-store")将状态存储添加到处理器拓扑中。然后,在调用KStream#process()KStream#transform()等方法时,还必须传入状态存储的名称 - 否则您的应用程序将在运行时失败。

KStream#transform()为例:

KStreamBuilder builder = new KStreamBuilder();

// Add the countStore that will be used within the Transformer[Supplier]
// that we pass into `transform()` below.
builder.addStateStore(countStore);

KStream<byte[], String> input = builder.stream("source-topic");

KStream<String, Long> transformed =
    input.transform(/* your TransformerSupplier */, countStore.name());

为什么处理器需要有状态存储?对于无状态且不维护状态的处理器来说,这似乎是可选的。

你是对的 - 如果处理器没有维持状态,你就不需要状态存储。

使用DSL时,您只需调用KStreamBuilder#addStateStore("name-of-your-store")将状态存储添加到处理器拓扑中,稍后再引用它。

© www.soinside.com 2019 - 2024. All rights reserved.