我们需要从Kafka主题导出生产数据以将其用于测试目的:数据是用Avro编写的,模式放在Schema注册表中。
我们尝试了以下策略:
kafka-console-consumer
与StringDeserializer
或BinaryDeserializer
。我们无法获得可以用Java解析的文件:解析时我们总是遇到异常,表明文件格式错误。kafka-avro-console-consumer
:它生成一个json,其中也包含一些字节,例如在反序列化BigDecimal时。我们甚至不知道选择哪个解析选项(它不是avro,它不是json)其他不合适的策略:
潜在的合适策略
是不是有一种简单,容易的方法将包含avro数据的Kafka主题的值(而不是模式)的内容转储到文件中,以便可以解析它?我希望使用具有正确选项的kafka-console-consumer以及使用正确的Avro Java Api来实现这一目标。
例如,使用kafka-console-consumer ...我们无法获得可以用Java解析的文件:我们在解析它时总是遇到异常,表明文件格式错误。
您不会使用常规控制台使用者。你可以使用kafka-avro-console-consumer
将二进制avro数据反序列化为json,供你在控制台上阅读。您可以将> topic.txt
重定向到控制台以进行读取。
如果您确实使用了控制台使用者,则无法立即解析Avro,因为您仍需要从数据中提取架构ID(第一个“魔术字节”之后的4个字节),然后使用架构注册表客户端检索架构,只有这样你才能反序列化消息。您用来读取此文件的任何Avro库都是控制台使用者编写的,它希望将一个完整的模式放在文件的标题中,而不仅仅是指向每行注册表中任何内容的ID。 (基本的Avro库对注册表一无所知)
关于控制台消费者is the formatter and the registry唯一可配置的东西。您可以通过另外将解码器导出到CLASSPATH来添加解码器
以这种格式,您可以从Java重新读取它?
为什么不用Java编写Kafka消费者呢? See Schema Registry documentation
将代码打包并放置在某些生产服务器中
不完全确定为什么这是一个问题。如果您可以将SSH代理或VPN连接到生产网络,那么您无需在那里部署任何内容。
你如何导出这些数据
由于您使用的是Schema Registry,我建议使用其中一个Kafka Connect libraries
包含的内容适用于Hadoop,S3,Elasticsearch和JDBC。我认为还有一个FileSink连接器
我们没有找到重置消费者偏移的简单方法
连接器名称控制是否以分布式模式形成新的使用者组。您只需要一个消费者,因此我建议使用独立连接器,您可以在其中设置offset.storage.file.filename
属性以控制偏移量的存储方式。
KIP-199讨论了重置Connect的消费者偏移,但功能没有实现。
但是,你看到Kafka 0.11 how to reset offsets了吗?
其他选项包括Apache Nifi或Streamsets,它们都集成到Schema Registry中,可以解析Avro数据以将其传输到众多系统
与cricket_007一起考虑的一个选择是简单地将数据从一个集群复制到另一个集群。您可以使用Apache Kafka Mirror Maker执行此操作,或使用Confluent中的Replicator。两者都可以选择将某些主题从一个群集复制到另一个群集(例如测试环境)。