Apache Storm中的JSON Kafka喷口

问题描述 投票:0回答:1

我正在使用Kafka喷口构建Storm拓扑。我从JSON格式的Kafka(没有Zookeeper)中消费,Storm应该输出它。如何为JSON数据类型定义适当的架构?目前,我有这样的代码库和基本的spout实现:

val cluster = new LocalCluster()
val bootstrapServers = "localhost:9092"
val topologyBuilder = new TopologyBuilder()

val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "test").build()

topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)

val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())

cluster.shutdown()

我是Apache Storm的新手,所以希望获得任何建议。

json apache-kafka apache-storm spout
1个回答
0
投票

您可以做几件事:

您可以定义RecordTranslator。此接口使您可以根据从Kafka读取的RecordTranslator定义喷嘴如何构造元组。

默认实现如下:

ConsumerRecord

如您所见,您将获得一个public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value"); @Override public List<Object> apply(ConsumerRecord<K, V> record) { return new Values(record.topic(), record.partition(), record.offset(), record.key(), record.value()); } @Override public Fields getFieldsFor(String stream) { return FIELDS; } ,这是内置在基础Kafka客户端库中的一种类型,然后必须将其转换为将成为元组值的ConsumerRecord。如果您想在发送数据之前对记录做一些复杂的事情,那将是您的处理方式。例如,如果您想将键,值和偏移量填充到随后发出的数据结构中,则可以在此处进行操作。您可以使用List<Object>

之类的翻译器

如果您只想将键/值反序列化为自己的数据类之一,则更好的选择是实现KafkaSpoutConfig.builder(bootstrapServers, "test").setRecordTranslator(myTranslator).build()。这将使您定义如何反序列化从Kafka获得的键/值。如果您想反序列化例如您自己的数据类的值,则可以使用此接口进行操作。

默认Deserializer执行此操作:

Deserializer

一旦创建了自己的StringDeserializer,就可以通过执行 @Override public String deserialize(String topic, byte[] data) { try { if (data == null) return null; else return new String(data, encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); } } 之类的方法来使用它。设置值反序列化器也有类似的使用者属性。

© www.soinside.com 2019 - 2024. All rights reserved.