使用 org.springframework.messaging.Message 和 Protobuf 对象作为 kafka 消息时出现 ClassCast 错误

问题描述 投票:0回答:1

我正在使用

spring-kafka
和生产者属性,如下

key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer

我的代码看起来像这样 -

                         Message<Event> message = MessageBuilder
                        .withPayload(protobufStruct)
                        .setHeader(MESSAGE_TIMESTAMP, Instant.now().toString())
                        .build();

                kafkaTemplate.send(topic, new MessageKey(key).toString(), message);

生成消息时,我收到类似错误

引起:java.lang.ClassCastException:类 org.springframework.messaging.support.GenericMessage 无法转换为 com.google.protobuf.Message 类 (org.springframework.messaging.support.GenericMessage 和 com.google.protobuf.Message 位于加载程序“app”的未命名模块中) io.confluence.kafka.serializers.protobuf.KafkaProtobufSerializer.serialize(KafkaProtobufSerializer.java:34) 在 org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) 在 org.apache.kafka.clients. Producer.KafkaProducer.doSend(KafkaProducer.java:1015)

我在这里犯了什么错误?

apache-kafka protocol-buffers spring-kafka
1个回答
0
投票

没有重载的

send(topic, key, Message<?>)
方法。

send(topic, K, V)
正在使用,因此您将
GenericMessage
作为有效负载 (
record.value()
) 发送。

请使用

kafkaTemplate.send(topic, new MessageKey(key).toString(), protobufStruct);
来代替。

如果您想自己设置时间戳,请使用

send(ProducerRecord<K, V>)
变体。

© www.soinside.com 2019 - 2024. All rights reserved.