我使用 KafkaStreams 来处理消息的 kafka,并将累积状态保存在 KeyValueStore 中。
但我还需要通过 API 调用以编程方式影响 KeyValueStore 的状态。
如果我引用了
Processor
或者甚至是 KeyValueStore
,这会很容易,但我没有引用任何一个。
我正在创建一个
Topology
,它需要一个ProcessorSupplier
和一个StoreBuilder
,这样我就没有对Processor
和Store
的引用。看起来像
final MyProcessorSupplier processorSupplier = ...;
final StoreBuilder<KeyValueStore<PriceKey, Short>> storeBuilder = ...;
final Deserializer<SubscriptionRequest> deserializer = ...;
final Topology topology = new Topology();
topology.addSource("Source", new StringDeserializer(), deserializer, topic)
.addProcessor("Process", processorSupplier, "Source")
.addStateStore(storeBuilder, "Process");
streams = new KafkaStreams(topology, kafkaClientSettings.toProperties());
streams.setUncaughtExceptionHandler(new LoggingStreamsUncaughtExceptionHandler(topic));
streams.start();
因此实际的
Processor
和 Store
是在 KafkaStreams 代码中创建的,我看不到任何参考。
有没有一种方法可以直接操作 KeyValueStore 或将合成消息传递给处理器?
我正在使用 kafka 2.8.2,但计划很快更新到 3.5,如果 3.5 有更好的方法来做到这一点。
您正在尝试从外部源影响流状态,这通常不是推荐的做法。这并不是因为此类操作消除了 Kafka Streams 库提供的处理保证。
解决问题的最佳方法是使用额外的主题。让您的 API 调用将必要的事件发布到该主题,然后使用处理器 API 正确处理来自该流的存储。这样您就可以维持 Kafka Streams 附带的所有处理保证。它还将使您不再需要手动构建拓扑,而是依赖 DSL 和自定义处理器。
您的实现将如下所示:
@RestController
public class MyController {
private final KafkaTemplate<String, MyObject> template;
@PostMapping("/my-api")
public void post(@RequestBody MyObject object) {
kafkaTemplate.send("my-topic", object.getKey(), object);
}
}
同时,在拓扑实现中,假设 Processor1 处理当前流,而 ProcessorAPI 处理来自 API 的事件,为简洁起见,我还跳过了 serde 详细信息。
var stream1 = streamsBuilder.stream("my-old-topic")
.process(() -> new Processor1("my-store-name"), "my-store-name");
var stream2 = streamsBuilder.stream("my-api-topic")
.process(() -> new ProcessorAPI("my-store-name"), "my-store-name");
stream1.merge(stream2);
现在两个处理器都可以访问存储,维护处理保证并处理 API 调用。最重要的是,您可以访问所有 DSL 语法。