创建Kafka聚合器并将其与事件连接

问题描述 投票:0回答:1

我正在尝试创建一个聚合器,在该聚合器中,我侦听多个记录并将它们合并为一个。合并后,我通过在listen()方法中加入流和聚合的应用程序来等待流程事件。在流程事件到达时,将触发一些业务逻辑。我已经在单个spring boot项目中定义了聚合器和进程侦听器。

@Bean
    public Function<KStream<FormUUID, FormData>, KStream<UUID, Application>> process()
    {
        return formEvent -> formEvent.groupByKey()
                .reduce((k, v) -> v)
                .toStream()
                .selectKey((k, v) -> k.getReferenceNo())
                .groupByKey()
                .aggregate(Application::new, (key, value, aggr) -> aggr.performAggregate(value),
                        Materialized.<UUID, Application, KeyValueStore<Bytes, byte[]>> as("appStore")
                                .withKeySerde(new JsonSerde<>(UUID.class))
                                .withValueSerde(new JsonSerde<>(Application.class)))
                .toStream();
    }

    @Bean
    public BiConsumer<KStream<String, ProcessEvent>, KTable<String, Application>> listen()
    {

        return (eventStream, appTable) -> 
        {
            eventStream.join(appTable, (event, app) -> app)
                    .foreach((k, app) -> app.createQuote());
        };

    }

但是,现在我面临着SerializationException。第一部分(聚合)工作正常,但是连接失败,但出现异常

java.lang.ClassCastException: com.xxxxx.datamapper.domain.FormData cannot be cast to com.xxxxx.datamapper.domain.Application
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42) ~[kafka-streams-2.3.1.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117) ~[kafka-streams-2.3.1.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.3.1.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.3.1.jar:?]

org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store APPLICATION_TOPIC-STATE-STORE-0000000001
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:280) ~[kafka-streams-2.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204) ~[kafka-streams-2.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:519) ~[kafka-streams-2.3.1.jar:?]

我认为,问题出在我的application.yml中。由于“ spring.json.key.default.type”属性设置为FormUUID,因此该方法用于侦听方法中存在的Application对象。我想在application.yml中为其余类型UUID,Application和ProcessEvent配置类型。但不确定如何为定义的每个使用者和生产者配置映射类型。

spring.cloud:
 function.definition: process;listen
 stream:
  kafka.streams:
    bindings:
      process-in-0.consumer.application-id: form-aggregator
      listen-in-0.consumer.application-id: event-processor
      listen-in-1.consumer.application-id: event-processor
    binder.configuration:
      default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.key.default.type: com.xxxx.datamapper.domain.FormUUID
      spring.json.value.default.type: com.xxxx.datamapper.domain.FormData
      commit.interval.ms: 1000
  bindings:
    process-in-0.destination: FORM_DATA_TOPIC
    process-out-0.destination: APPLICATION_TOPIC
    listen-in-0.destination: PROCESS_TOPIC
    listen-in-1: 
      destination: APPLICATION_TOPIC
      consumer:
       useNativeDecoding: true
apache-kafka spring-cloud-stream
1个回答
0
投票

如果使用的是最新的Horsham版本的Spring Cloud Stream Kafka Streams活页夹,则无需为入站和出站设置任何显式的Serdes。但是,仍然需要在Kafka Streams API需要它们的任何地方提供它们,就像上面的聚合方法调用一样。如果在第二个处理器的入站时遇到此序列化错误,建议您尝试从配置中删除所有Serdes。您可以在下面进行简化(假设您使用的是最新的Horsham版本)。活页夹将推断出正确的Serdes以用于入站/出站。将其委派给活页夹的一个好处是,您不需要通过配置提供任何显式键/值类型,因为活页夹将对这些类型进行内省。确保您正在使用的POJO类型是JSON友好的。看看是否可行。如果您仍然遇到问题,请创建一个小示例应用程序,在此我们可以重现该问题,我们将进行调查。

spring.cloud:
 function.definition: process;listen
 stream:
  kafka.streams:
    bindings:
      process-in-0.consumer.application-id: form-aggregator
      listen-in-0.consumer.application-id: event-processor
      listen-in-1.consumer.application-id: event-processor
    binder.configuration:
      commit.interval.ms: 1000
  bindings:
    process-in-0.destination: FORM_DATA_TOPIC
    process-out-0.destination: APPLICATION_TOPIC
    listen-in-0.destination: PROCESS_TOPIC
    listen-in-1.destination: APPLICATION_TOPIC
© www.soinside.com 2019 - 2024. All rights reserved.