我正在使用
spring-kafka
和生产者属性,如下
key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
我的代码看起来像这样 -
Message<Event> message = MessageBuilder
.withPayload(protobufStruct)
.setHeader(MESSAGE_TIMESTAMP, Instant.now().toString())
.build();
kafkaTemplate.send(topic, new MessageKey(key).toString(), message);
生成消息时,我收到类似错误
引起:java.lang.ClassCastException:类 org.springframework.messaging.support.GenericMessage 无法转换为 com.google.protobuf.Message 类 (org.springframework.messaging.support.GenericMessage 和 com.google.protobuf.Message 位于加载程序“app”的未命名模块中) io.confluence.kafka.serializers.protobuf.KafkaProtobufSerializer.serialize(KafkaProtobufSerializer.java:34) 在 org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) 在 org.apache.kafka.clients. Producer.KafkaProducer.doSend(KafkaProducer.java:1015)
我在这里犯了什么错误?
没有重载的
send(topic, key, Message<?>)
方法。
send(topic, K, V)
正在使用,因此您将 GenericMessage
作为有效负载 (record.value()
) 发送。
请使用
kafkaTemplate.send(topic, new MessageKey(key).toString(), protobufStruct);
来代替。
如果您想自己设置时间戳,请使用
send(ProducerRecord<K, V>)
变体。