如何操作KafkaStreams KeyValueStore

问题描述 投票:0回答:1

我使用 KafkaStreams 来处理消息的 kafka,并将累积状态保存在 KeyValueStore 中。
但我还需要通过 API 调用以编程方式影响 KeyValueStore 的状态。
如果我引用了

Processor
或者甚至是
KeyValueStore
,这会很容易,但我没有引用任何一个。

我正在创建一个

Topology
,它需要一个
ProcessorSupplier
和一个
StoreBuilder
,这样我就没有对
Processor
Store
的引用。看起来像

final MyProcessorSupplier processorSupplier = ...;
final StoreBuilder<KeyValueStore<PriceKey, Short>> storeBuilder = ...;
final Deserializer<SubscriptionRequest> deserializer = ...;
final Topology topology = new Topology();
topology.addSource("Source", new StringDeserializer(), deserializer, topic)
        .addProcessor("Process", processorSupplier, "Source")
        .addStateStore(storeBuilder, "Process");

streams = new KafkaStreams(topology, kafkaClientSettings.toProperties());
streams.setUncaughtExceptionHandler(new LoggingStreamsUncaughtExceptionHandler(topic));
streams.start();

因此实际的

Processor
Store
是在 KafkaStreams 代码中创建的,我看不到任何参考。

有没有一种方法可以直接操作 KeyValueStore 或将合成消息传递给处理器?

我正在使用 kafka 2.8.2,但计划很快更新到 3.5,如果 3.5 有更好的方法来做到这一点。

java apache-kafka-streams
1个回答
0
投票

您正在尝试从外部源影响流状态,这通常不是推荐的做法。这并不是因为此类操作消除了 Kafka Streams 库提供的处理保证。

解决问题的最佳方法是使用额外的主题。让您的 API 调用将必要的事件发布到该主题,然后使用处理器 API 正确处理来自该流的存储。这样您就可以维持 Kafka Streams 附带的所有处理保证。它还将使您不再需要手动构建拓扑,而是依赖 DSL 和自定义处理器。

您的实现将如下所示:

@RestController
public class MyController {
  private final KafkaTemplate<String, MyObject> template;

  @PostMapping("/my-api")
  public void post(@RequestBody MyObject object) {
    kafkaTemplate.send("my-topic", object.getKey(), object);
  } 
}

同时,在拓扑实现中,假设 Processor1 处理当前流,而 ProcessorAPI 处理来自 API 的事件,为简洁起见,我还跳过了 serde 详细信息。

var stream1 = streamsBuilder.stream("my-old-topic")
   .process(() -> new Processor1("my-store-name"), "my-store-name");
var stream2 = streamsBuilder.stream("my-api-topic")
   .process(() -> new ProcessorAPI("my-store-name"), "my-store-name");
stream1.merge(stream2);

现在两个处理器都可以访问存储,维护处理保证并处理 API 调用。最重要的是,您可以访问所有 DSL 语法。

© www.soinside.com 2019 - 2024. All rights reserved.