我正在尝试在Spring Cloud Kafka Streams应用程序中执行flatTransform
。但是我不确定在KafkaStreamsStateStore
注释的确切位置。目前,我遇到了错误:Invalid topology: StateStore activeInstruments is not added yet
。如果有人能给我一些指导,我将不胜感激。
@SpringBootApplication
public class InstrumentsApp {
public static void main(String[] args) {
SpringApplication.run(InstrumentsApp.class, args);
}
public static class InstrumentsConsumer {
@Bean
public Serde<InstrumentsRes> instrumentsResSerde() {
ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
mapper.registerModule(new JavaTimeModule());
mapper.configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, false);
return new JsonSerde<>(InstrumentsRes.class, mapper);
}
@Bean
public Serde<Instrument> instrumentSerde() {
ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
mapper.registerModule(new JavaTimeModule());
mapper.configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, false);
mapper.configure(SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS, false);
return new JsonSerde<>(Instrument.class, mapper);
}
@Bean
@KafkaStreamsStateStore(name = "activeInstruments",
type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
public Consumer<KStream<String, InstrumentsRes>> process() {
return instruments -> instruments.flatTransform(
new TransformerSupplier<String, InstrumentsRes, Iterable<KeyValue<String, Instrument>>>() {
public Transformer<String, InstrumentsRes, Iterable<KeyValue<String, Instrument>>> get() {
return new Transformer<String, InstrumentsRes, Iterable<KeyValue<String, Instrument>>>() {
private ProcessorContext context;
private KeyValueStore<String, InstrumentsRes> state;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.context = context;
this.state = (KeyValueStore<String, InstrumentsRes>) context
.getStateStore("activeInstruments");
}
@Override
public Iterable<KeyValue<String, Instrument>> transform(String key,
InstrumentsRes value) {
List<KeyValue<String, Instrument>> result = new ArrayList<>();
for (Instrument instrument : value.result) {
result.add(KeyValue.pair(instrument.instrumentName, instrument));
}
InstrumentsRes prevValue = state.get(key);
if (prevValue != null) {
HashSet<String> prevInstrumentNames = value.getInstrumentNames();
HashSet<String> newInstrumentNames = value.getInstrumentNames();
prevInstrumentNames.removeAll(newInstrumentNames);
for (String instrumentName : prevInstrumentNames) {
result.add(KeyValue.pair(instrumentName, null));
}
}
state.put(key, value);
return result;
}
public void close() {
}
};
}
}, "activeInstruments");
}
}
}
要回答我自己的问题,我认为KafkaStreamsStateStore
注释现在已弃用:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/676#issuecomment-502923103。
现在,根据注释中的建议,我创建了一个类型为StoreBuilder
的bean,并且一切都按预期工作。