过滤数据螺栓风暴

问题描述 投票:0回答:1

我有一个简单的Storm拓扑,该拓扑从Kafka读取数据,解析并提取消息字段。我想通过一个字段值过滤元组流,并在另一个字段上执行计数聚合。如何在Storm中做到这一点?我没有找到元组的各个方法(过滤器,聚合),所以我应该直接在字段值上执行这些功能吗?

这里是拓扑:

topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)
topologyBuilder.setBolt("parser_bolt", new ParserBolt()).shuffleGrouping("kafka_spout")
topologyBuilder.setBolt("transformer_bolt", new KafkaTwitterBolt()).shuffleGrouping("parser_bolt")

val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())

我已经设置了KafkaTwitterBolt来对已解析的字段进行计数和过滤。我设法不按特定字段过滤整个值列表:

class KafkaTwitterBolt() extends BaseBasicBolt{

 override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
  val tweetValues = input.getValues.asScala.toList
  val filterTweets = tweetValues
     .map(_.toString)
     .filter(_ contains "big data")
  val resultAllValues = new Values(filterTweets)
  collector.emit(resultAllValues)
 }

 override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
  declarer.declare(new Fields("created_at", "id", "text", "source", "timestamp_ms",
   "user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
   "user.friends_count", "user.lang", "user.favorite_count", "entities.hashtags"))
 }
}
filter apache-storm topology
1个回答
0
投票

原来,Storm核心API不允许这样做,为了对任何字段执行过滤,都应使用Trident(它具有内置的过滤器功能)。代码看起来像这样:

 val tridentTopology = new TridentTopology()

    val stream = tridentTopology.newStream("kafka_spout",
      new KafkaTridentSpoutOpaque(spoutConfig))
      .map(new ParserMapFunction, new Fields("created_at", "id", "text", "source", "timestamp_ms",
        "user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
        "user.friends_count", "user.favorite_count", "user.lang", "entities.hashtags"))
    .filter(new LanguageFilter)

过滤功能本身:

class LanguageFilter extends BaseFilter{

  override def isKeep(tuple: TridentTuple): Boolean = {
    val language = tuple.getStringByField("user.lang")
    println(s"TWEET: $language")
    language.contains("en")
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.