如何在DSL中使用KeyValueStore状态存储?

问题描述 投票:0回答:1
KeyValueStore<String, Long> kvStore=(KeyValueStore<String, Long>) 
Stores.create("InterWindowStore1").withKeys(Serdes.String())
                .withValues(Serdes.Long())
                .persistent()
                .build().get();` 

我已经创建了statestore,如上面的代码所示,并尝试插入到kvStore.put(key, value);,但它给我NPE

Caused by: java.lang.NullPointerException
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
    at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34)
    at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:131)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:95)
apache-kafka-streams
1个回答
1
投票

正如您在评论中描述的那样,您基本上是在进行窗口聚合:

KStream stream = ...
KTable table = stream.groupByKey().aggregate(..., TimeWindow.of(...));

由于KTable流可能包含窗口聚合的更新,因此您需要修改此流。为此,您可以使用有状态变压器或值变换器:

StateStoreSupplier myState = State.create("nameOfMyState")....;

KStream result = table.toStream().transform(..., "nameOfMyState");

最后,您可以将结果写入输出主题:

result.to("output-topic");

您提供给Transformertransform可以通过init()中的给定上下文获取状态,并在每次生成/更新窗口输出时在transform()中使用。

© www.soinside.com 2019 - 2024. All rights reserved.