如何从Kafka主题转储avro数据并在Java / Scala中读取它

问题描述 投票:0回答:2

我们需要从Kafka主题导出生产数据以将其用于测试目的:数据是用Avro编写的,模式放在Schema注册表中。

我们尝试了以下策略:

  • 使用kafka-console-consumerStringDeserializerBinaryDeserializer。我们无法获得可以用Java解析的文件:解析时我们总是遇到异常,表明文件格式错误。
  • 使用kafka-avro-console-consumer:它生成一个json,其中也包含一些字节,例如在反序列化BigDecimal时。我们甚至不知道选择哪个解析选项(它不是avro,它不是json)

其他不合适的策略:

  • 部署特殊的kafka消费者需要我们将该代码打包并放置在某个生产服务器中,因为我们正在谈论我们的生产集群。这太长了。毕竟,是不是kafka控制台消费者已经是具有可配置选项的消费者?

潜在的合适策略

  • 使用kafka连接接收器。我们没有找到一种简单的方法来重置消费者偏移量,因为即使我们删除了接收器,显然连接器创建的消费者仍然处于活动状态

是不是有一种简单,容易的方法将包含avro数据的Kafka主题的值(而不是模式)的内容转储到文件中,以便可以解析它?我希望使用具有正确选项的kafka-console-consumer以及使用正确的Avro Java Api来实现这一目标。

apache-kafka avro
2个回答
1
投票

例如,使用kafka-console-consumer ...我们无法获得可以用Java解析的文件:我们在解析它时总是遇到异常,表明文件格式错误。

您不会使用常规控制台使用者。你可以使用kafka-avro-console-consumer将二进制avro数据反序列化为json,供你在控制台上阅读。您可以将> topic.txt重定向到控制台以进行读取。

如果您确实使用了控制台使用者,则无法立即解析Avro,因为您仍需要从数据中提取架构ID(第一个“魔术字节”之后的4个字节),然后使用架构注册表客户端检索架构,只有这样你才能反序列化消息。您用来读取此文件的任何Avro库都是控制台使用者编写的,它希望将一个完整的模式放在文件的标题中,而不仅仅是指向每行注册表中任何内容的ID。 (基本的Avro库对注册表一无所知)

关于控制台消费者is the formatter and the registry唯一可配置的东西。您可以通过另外将解码器导出到CLASSPATH来添加解码器

以这种格式,您可以从Java重新读取它?

为什么不用Java编写Kafka消费者呢? See Schema Registry documentation

将代码打包并放置在某些生产服务器中

不完全确定为什么这是一个问题。如果您可以将SSH代理或VPN连接到生产网络,那么您无需在那里部署任何内容。

你如何导出这些数据

由于您使用的是Schema Registry,我建议使用其中一个Kafka Connect libraries

包含的内容适用于Hadoop,S3,Elasticsearch和JDBC。我认为还有一个FileSink连接器

我们没有找到重置消费者偏移的简单方法

连接器名称控制是否以分布式模式形成新的使用者组。您只需要一个消费者,因此我建议使用独立连接器,您可以在其中设置offset.storage.file.filename属性以控制偏移量的存储方式。

KIP-199讨论了重置Connect的消费者偏移,但功能没有实现。

但是,你看到Kafka 0.11 how to reset offsets了吗?

其他选项包括Apache Nifi或Streamsets,它们都集成到Schema Registry中,可以解析Avro数据以将其传输到众多系统


0
投票

与cricket_007一起考虑的一个选择是简单地将数据从一个集群复制到另一个集群。您可以使用Apache Kafka Mirror Maker执行此操作,或使用Confluent中的Replicator。两者都可以选择将某些主题从一个群集复制到另一个群集(例如测试环境)。

© www.soinside.com 2019 - 2024. All rights reserved.