StateStore永远不会添加到Spring云中

问题描述 投票:0回答:2

任何人如何在Spring cloud上添加状态存储

我总是收到此错误“嵌套的异常是org.springframework.kafka.KafkaException:无法启动流:;嵌套的异常是org.apache.kafka.streams.errors.TopologyException:无效的拓扑:尚未添加StateStore myStore。 “

这里是bean的定义,但是它永远都行不通

@Bean
  public StoreBuilder storeBuilder() {
    KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("mystore");
    StoreBuilder<KeyValueStore<String, MyData>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), StreamsSerde.MyDataSerde());
    return storeBuilder;
  }

这里是Serde

public static final class MyDataSerde extends Serdes.WrapperSerde<MyData> {
    public MyDataSerde() {
      super(new JsonSerializer<>(), new JsonDeserializer<>(MyData.class));
    }
  }

这里是数据类

public class MyData {
  private String name;
  private String course;
}

这里是Spring Cloud依赖项

springBootVersion = "2.2.5.RELEASE"
set('springCloudVersion', "Hoxton.SR3")

implementation group:"org.springframework.cloud", name: "spring-cloud-stream"
    implementation group: "org.springframework.cloud", name: "spring-cloud-stream-binder-kafka-streams"
    implementation group: "org.springframework.cloud", name: "spring-cloud-starter-stream-kafka" 
spring spring-cloud apache-kafka-streams spring-cloud-stream
2个回答
1
投票

当您必须使用较低级别的处理器或转换器API时,需要添加这样的状态存储。您是否尝试将状态存储添加到processtransform方法调用中?这是一个有效的test。看一下process调用以及状态存储的传递方式。


0
投票

我在本文中找到了以编程方式添加商店的解决方案

public void initializeStateStores() throws Exception {
   StreamsBuilderFactoryBean streamsBuilderFactoryBean =
         applicationContext.getBean("&stream-builder-requestListener", StreamsBuilderFactoryBean.class);
   StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
   StoreBuilder<KeyValueStore<String, Long>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), Serdes.String(), Serdes.Long());
   streamsBuilder.addStateStore(keyValueStoreBuilder);
}

https://medium.com/@daniyaryeralin/utilizing-kafka-streams-processor-api-and-implementing-custom-aggregator-6cb23d00eaa7

© www.soinside.com 2019 - 2024. All rights reserved.