从控制台生成具有空值(墓碑)的 Kafka 消息

问题描述 投票:0回答:3

有没有办法在 kafka-console- Producer 中生成一条带有空值的消息(即标记它以便压缩器用墓碑删除它)?

我尝试过生成“mykey”和“mykey|”。前者产生错误,后者使值成为空字符串。像这样运行制作人:

$KAFKA_HOME/bin/kafka-console-producer --broker-list localhost:9092 --topic mytopic --property "parse.key=true" --property "key.separator=|"
apache-kafka kafka-producer-api
3个回答
10
投票

看看kafkacat(kafka 的网络猫)。引用文档:

通过提供 -Z 解释为 NULL 的空消息值,为键“abc”生成墓碑(压缩主题的“删除”):

echo "abc:" | kafkacat -b mybroker -t mytopic -Z -K:

2
投票

不幸的是,无法使用控制台生产者来做到这一点

这是 ConsoleProducer 类的代码片段(它如何读取数据)。 Kafka 0.11.0(不认为不同版本之间有很大变化)。

override def readMessage() = {
  lineNumber += 1
  print(">")
  (reader.readLine(), parseKey) match {
    case (null, _) => null
    case (line, true) =>
      line.indexOf(keySeparator) match {
        case -1 =>
          if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
          else throw new KafkaException(s"No key found on line $lineNumber: $line")
        case n =>
          val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8)
          new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value)
      }
    case (line, false) =>
      new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
  }
}

如您所见,该值始终是不可为空的字节数组


0
投票

通过此 PR,该功能得以实现。 您可以生成一条带有 Null 值的消息,如下所示:

$ ./bin/kafka-console- Producer.sh --bootstrap-server localhost:9092 --topic test --property null.marker=NULL

请参阅 KIP 的融合页面 获取文档

© www.soinside.com 2019 - 2024. All rights reserved.