具有多种消息类型的卡夫卡和主题(Avro):消费者(由于在类路径上缺少类,Spring Cloud Spring失败了)>

问题描述 投票:0回答:1

主题包含两种类型的消息:PaymentStarted和PaymentCompleted。与消费者有两种分离的微服务。所以:Microservice_1,有consumer1应该抓住PaymentStarted类型;Microservice_2的使用者2应该获取PaymentCompleted类型。此外,Microservice_1在其jvm类路径上仅包含PaymentStarted,并且Microservice_2在其jvm类路径上仅包含PaymentCompleted。

每个用于处理不同类型消息的服务。我正在使用spring-cloud-stream,因此在Microservice_1中,使用者1具有:

@StreamListener(target = PayBindings.EVENTS, condition = "headers['eventType']=='com.company.domain.PaymentStarted'")
    void onPaymentEvents(Message<PaymentStarted> message){
    }

从逻辑上讲微服务_2,使用者2具有:

@StreamListener(target = PayBindings.EVENTS, condition = "headers['eventType']=='com.company.domain.PaymentCompleted'")
    void onPaymentEvents(Message<PaymentCompleted> message){
    }

文档说(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.0.M1/spring-cloud-stream.html#_using_streamlistener_annotation),过滤是在序列化之前进行的。

但是对于我来说,在条件将其过滤掉之前,默认的反序列化程序(在我的情况下为io.confluent.kafka.serializers.KafkaAvroDeserializer)由于在microservice_1的类路径上缺少PaymentCompleted而失败。与microservice_2和PaymentStarted类似。我不想混用这些域,并在Microservice_1和Microservice_2中保留PaymentStarted和PaymentCompleted pojos。

我试图用我的自定义解串器解决这个问题,它看起来可以处理异常,但这确实很棘手。

[我也很困惑,我上面提到的文档说过滤是在反序列化之前进行的。

感谢您的想法/意见。

主题包含两种类型的消息:PaymentStarted和PaymentCompleted。与消费者有两种分离的微服务。所以:Microservice_1,有consumer1应该抓住PaymentStarted ...

java apache-kafka spring-cloud-stream
1个回答
0
投票

您应该为每种类型使用不同的主题。

© www.soinside.com 2019 - 2024. All rights reserved.