如何在Vertx Kafka客户端中使用自定义序列化程序?

问题描述 投票:0回答:1

我有以下kafka生产者属性。

value.serializer=MyEventSerializer
value.deserializer=MyEventDeserializer
default.value.serde=MyEventSerde

我已经通过Vertx site中的Serializers并使用了创建生产者

KafkaProducer<String, MyEvent> producer = KafkaProducer.create(vertx, configProperties, String.class, MyEvent.class);

但我收到以下错误:

SEVERE: Unknown class for built-in 
serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes
java.lang.IllegalArgumentException: Unknown class for built-in serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes

有没有办法在Vertx kafka客户端中使用自定义序列化程序?

java apache-kafka vert.x
1个回答
1
投票

我必须手动完成KafkaProducer.create()的工作。

Serializer<String> keySerializer = VertxSerdes.serdeFrom(String.class).serializer();
Serializer<MyEvent> valueSerializer = new MyEventSerializer();
KafkaWriteStream<String, MyEvent> stream = new KafkaWriteStreamImpl(vertx.getOrCreateContext(), new org.apache.kafka.clients.producer.KafkaProducer(configProperties, keySerializer, valueSerializer));
KafkaProducer<String,MyEvent> producer=(new KafkaProducerImpl(stream)).registerCloseHook();

然后使用..写下记录

KafkaProducerRecord producerRecord= KafkaProducerRecord.create(topicName,key,value);

        producer.write(producerRecord, done -> {
            if (done.succeeded()) {
                // TODO if succeeded
            } else {
                // TODO if failed
            }

         });
© www.soinside.com 2019 - 2024. All rights reserved.