我正在使用Kafka喷口构建Storm拓扑。我从JSON格式的Kafka(没有Zookeeper)中消费,Storm应该输出它。如何为JSON数据类型定义适当的架构?目前,我有这样的代码库和基本的spout实现:
val cluster = new LocalCluster()
val bootstrapServers = "localhost:9092"
val topologyBuilder = new TopologyBuilder()
val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "test").build()
topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)
val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())
cluster.shutdown()
我是Apache Storm的新手,所以希望获得任何建议。
您可以做几件事:
您可以定义RecordTranslator
。此接口使您可以根据从Kafka读取的RecordTranslator
定义喷嘴如何构造元组。
默认实现如下:
ConsumerRecord
如您所见,您将获得一个public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
@Override
public List<Object> apply(ConsumerRecord<K, V> record) {
return new Values(record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
}
@Override
public Fields getFieldsFor(String stream) {
return FIELDS;
}
,这是内置在基础Kafka客户端库中的一种类型,然后必须将其转换为将成为元组值的ConsumerRecord
。如果您想在发送数据之前对记录做一些复杂的事情,那将是您的处理方式。例如,如果您想将键,值和偏移量填充到随后发出的数据结构中,则可以在此处进行操作。您可以使用List<Object>
如果您只想将键/值反序列化为自己的数据类之一,则更好的选择是实现KafkaSpoutConfig.builder(bootstrapServers, "test").setRecordTranslator(myTranslator).build()
。这将使您定义如何反序列化从Kafka获得的键/值。如果您想反序列化例如您自己的数据类的值,则可以使用此接口进行操作。
默认Deserializer
执行此操作:
Deserializer
一旦创建了自己的StringDeserializer
,就可以通过执行 @Override
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
之类的方法来使用它。设置值反序列化器也有类似的使用者属性。