我有一个简单的Storm拓扑,该拓扑从Kafka读取数据,解析并提取消息字段。我想通过一个字段值过滤元组流,并在另一个字段上执行计数聚合。如何在Storm中做到这一点?我没有找到元组的各个方法(过滤器,聚合),所以我应该直接在字段值上执行这些功能吗?
这里是拓扑:
topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)
topologyBuilder.setBolt("parser_bolt", new ParserBolt()).shuffleGrouping("kafka_spout")
topologyBuilder.setBolt("transformer_bolt", new KafkaTwitterBolt()).shuffleGrouping("parser_bolt")
val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())
我已经设置了KafkaTwitterBolt来对已解析的字段进行计数和过滤。我设法不按特定字段过滤整个值列表:
class KafkaTwitterBolt() extends BaseBasicBolt{
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
val tweetValues = input.getValues.asScala.toList
val filterTweets = tweetValues
.map(_.toString)
.filter(_ contains "big data")
val resultAllValues = new Values(filterTweets)
collector.emit(resultAllValues)
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("created_at", "id", "text", "source", "timestamp_ms",
"user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
"user.friends_count", "user.lang", "user.favorite_count", "entities.hashtags"))
}
}
原来,Storm核心API不允许这样做,为了对任何字段执行过滤,都应使用Trident(它具有内置的过滤器功能)。代码看起来像这样:
val tridentTopology = new TridentTopology()
val stream = tridentTopology.newStream("kafka_spout",
new KafkaTridentSpoutOpaque(spoutConfig))
.map(new ParserMapFunction, new Fields("created_at", "id", "text", "source", "timestamp_ms",
"user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
"user.friends_count", "user.favorite_count", "user.lang", "entities.hashtags"))
.filter(new LanguageFilter)
过滤功能本身:
class LanguageFilter extends BaseFilter{
override def isKeep(tuple: TridentTuple): Boolean = {
val language = tuple.getStringByField("user.lang")
println(s"TWEET: $language")
language.contains("en")
}
}