我正在尝试创建一个简单的实用程序来向Kafka发布消息,但需要传递标题以及消息。该实用程序没有标题工作正常但在尝试发送标题时我收到错误。
下面是我正在使用的示例代码 -
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:port");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
List<Header> headers = Arrays.asList(new RecordHeader("sample_header", "sample_value".getBytes()));
ProducerRecord<String, String> record = new ProducerRecord<>("TEST", 0, "key", "sample message", headers);
Future<RecordMetadata> future = producer.send(record);
System.out.println(future.get());
producer.close();
}
我得到的例外是 -
Exception in thread "main" java.lang.IllegalArgumentException: Magic v1 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:424)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:481)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:504)
at org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:106)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:219)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:791)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:745)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:634)
at com.test.kafka.KafkaProducerApp.main(KafkaProducerApp.java:45)
通过不同的方式(Spring Cloud Streams Kafka生产者)有同样的错误。我的问题是由于inter.broker.protocol.version
被设置为较低版本,然后是经纪人本身的版本see Kafka's broker configs它允许:
指定将使用哪个版本的代理间协议。在将所有经纪商升级到新版本后,这通常会受到影响。
对于我的部署,经纪人是v1.1
但inter.broker.protocol.version
是v0.10.1
。当v1.1
代理收到带有标题的消息时,它很好,直到它使用不支持标题的v0.10.1
复制消息(产生"Magic v1"
的错误)。