Kafka Java Producer API无法将密钥序列化为Long或Int

问题描述 投票:0回答:1

这是用于在Kafka中生成数据的Java代码:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ExampleClass {
  private final static String TOPIC = "my-example-topic";
  private final static String BOOTSTRAP_SERVERS = "confbroker:9092";

  private static Producer<Long, String> createProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    return new KafkaProducer<>(props);
  }
  private static void runProducer() throws Exception {
    final Producer<Long, String> producer = createProducer();
    long sensorId = 1001L;
    try {
      for (long index = sensorId; index < sensorId + 5; index++) {
        final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index, "This is sensor no: " + index);
        RecordMetadata metadata = producer.send(record).get();
        System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d)\n", record.key(),
            record.value(), metadata.partition(), metadata.offset());
      }
    } finally {
      producer.flush();
      producer.close();
    }
  }
  public static void main(String... args) throws Exception {
      runProducer();
  }
}

Confluent 5.4.0]中运行控制台使用者时,得到的结果是:

enter image description here

键是乱码。

我如何产生Int

Long类型的Key

PS:

=>也与Confluent 5.5

中的结果相同。

=>与IntegerSerializer

的结果相同。

这是在Kafka中产生数据的Java代码:import org.apache.kafka.clients.producer。*;导入org.apache.kafka.common.serialization.LongSerializer;导入org.apache.kafka.common ....

apache-kafka kafka-producer-api confluent-platform
1个回答
2
投票

控制台使用者将StringDeserialisers用作键和值的默认值。如果要将密钥反序列化为Long,则必须在console-consumer命令中明确提及:

--property key.deserializer org.apache.kafka.common.serialization.LongDeserializer
© www.soinside.com 2019 - 2024. All rights reserved.