我正在尝试创建一个聚合器,在该聚合器中,我侦听多个记录并将它们合并为一个。合并后,我通过在listen()方法中加入流和聚合的应用程序来等待流程事件。在流程事件到达时,将触发一些业务逻辑。我已经在单个spring boot项目中定义了聚合器和进程侦听器。
@Bean
public Function<KStream<FormUUID, FormData>, KStream<UUID, Application>> process()
{
return formEvent -> formEvent.groupByKey()
.reduce((k, v) -> v)
.toStream()
.selectKey((k, v) -> k.getReferenceNo())
.groupByKey()
.aggregate(Application::new, (key, value, aggr) -> aggr.performAggregate(value),
Materialized.<UUID, Application, KeyValueStore<Bytes, byte[]>> as("appStore")
.withKeySerde(new JsonSerde<>(UUID.class))
.withValueSerde(new JsonSerde<>(Application.class)))
.toStream();
}
@Bean
public BiConsumer<KStream<String, ProcessEvent>, KTable<String, Application>> listen()
{
return (eventStream, appTable) ->
{
eventStream.join(appTable, (event, app) -> app)
.foreach((k, app) -> app.createQuote());
};
}
但是,现在我面临着SerializationException。第一部分(聚合)工作正常,但是连接失败,但出现异常
java.lang.ClassCastException: com.xxxxx.datamapper.domain.FormData cannot be cast to com.xxxxx.datamapper.domain.Application
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42) ~[kafka-streams-2.3.1.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117) ~[kafka-streams-2.3.1.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.3.1.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.3.1.jar:?]
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store APPLICATION_TOPIC-STATE-STORE-0000000001
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:280) ~[kafka-streams-2.3.1.jar:?]
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204) ~[kafka-streams-2.3.1.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:519) ~[kafka-streams-2.3.1.jar:?]
我认为,问题出在我的application.yml中。由于“ spring.json.key.default.type”属性设置为FormUUID,因此该方法用于侦听方法中存在的Application对象。我想在application.yml中为其余类型UUID,Application和ProcessEvent配置类型。但不确定如何为定义的每个使用者和生产者配置映射类型。
spring.cloud:
function.definition: process;listen
stream:
kafka.streams:
bindings:
process-in-0.consumer.application-id: form-aggregator
listen-in-0.consumer.application-id: event-processor
listen-in-1.consumer.application-id: event-processor
binder.configuration:
default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.key.default.type: com.xxxx.datamapper.domain.FormUUID
spring.json.value.default.type: com.xxxx.datamapper.domain.FormData
commit.interval.ms: 1000
bindings:
process-in-0.destination: FORM_DATA_TOPIC
process-out-0.destination: APPLICATION_TOPIC
listen-in-0.destination: PROCESS_TOPIC
listen-in-1:
destination: APPLICATION_TOPIC
consumer:
useNativeDecoding: true
如果使用的是最新的Horsham版本的Spring Cloud Stream Kafka Streams活页夹,则无需为入站和出站设置任何显式的Serdes。但是,仍然需要在Kafka Streams API需要它们的任何地方提供它们,就像上面的聚合方法调用一样。如果在第二个处理器的入站时遇到此序列化错误,建议您尝试从配置中删除所有Serdes。您可以在下面进行简化(假设您使用的是最新的Horsham版本)。活页夹将推断出正确的Serdes以用于入站/出站。将其委派给活页夹的一个好处是,您不需要通过配置提供任何显式键/值类型,因为活页夹将对这些类型进行内省。确保您正在使用的POJO类型是JSON友好的。看看是否可行。如果您仍然遇到问题,请创建一个小示例应用程序,在此我们可以重现该问题,我们将进行调查。
spring.cloud:
function.definition: process;listen
stream:
kafka.streams:
bindings:
process-in-0.consumer.application-id: form-aggregator
listen-in-0.consumer.application-id: event-processor
listen-in-1.consumer.application-id: event-processor
binder.configuration:
commit.interval.ms: 1000
bindings:
process-in-0.destination: FORM_DATA_TOPIC
process-out-0.destination: APPLICATION_TOPIC
listen-in-0.destination: PROCESS_TOPIC
listen-in-1.destination: APPLICATION_TOPIC