如何从一个KafkaSpout一次发送多个(不同的)元组到螺栓?

问题描述 投票:0回答:1

我是Apache Storm的新手。

我正在尝试使用Apache Kafka,Storm和ESPER CEP引擎开发实时流处理系统。

为此,我有一个KafkaSpout将向Bolts(具有我的CEP查询)发送流来过滤流。

我已经创建了一个拓扑,我试图在本地集群上运行它

问题是在我的螺栓中运行的CEP查询需要批处理元组来对流执行窗口操作。在我的拓扑结构中,KafkaSpout一次只向Bolts发送一个元组进行处理。所以我的CEP查询没有按预期工作。

我在Storm中使用默认的KafkaSpout。有什么方法可以一次向螺栓发送多个不同的元组吗?一些配置调整可以做到这一点,还是我需要为此自定义KafkaSpout?

请帮忙!!

我的拓扑:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(“KafkaSpout”,新的KafkaSpout <>(KafkaSpoutConfig.builder(“localhost:”+ 9092,“weatherdata”)。setProp(ConsumerConfig.GROUP_ID_CONFIG,“weather-consumer-group”)。build()),4 );

builder.setBolt(“A”,new FeatureSelectionBolt(),2).globalGrouping(“KafkaSpout”);

builder.setBolt(“B”,new TrendDetectionBolt(),2).shuffleGrouping(“A”)

我正在使用2个螺栓和一个喷口。

我在博尔特A中运行的esper Query是

从weatherEvent.win:length(3)中选择第一个(e),最后一个(e)作为e

在这里,我试图从事件流中获取长度为3的窗口中的第一个和最后一个事件。但是我得到了同样的第一个和最后一个事件,因为KafkaSpout一次只发送一个元组。

apache-kafka apache-storm esper bolt spout
1个回答
0
投票

喷口不能这样做,但你可以使用Storm的窗口支持https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html,或者只是写一个聚合螺栓并将它放在喷口和拓扑的其余部分之间。

所以你的拓扑应该是spout -> aggregator -> feature selection -> trend detection

我建议你试试内置的窗口支持,但如果你想编写自己的聚合,你的螺栓实际上只需要接收一些元组(例如3),并发出一个包含所有值的新元组。

聚合器螺栓应该做类似的事情

private List<Tuple> buffered;

execute(Tuple input) {
  if (buffered.size != 2) {
    buffered.add(input)
    return
  }
  Tuple first = buffered.get(0)
  Tuple second = buffered.get(1)
  Values aggregate = new Values(first.getValues(), second.getValues(), input.getValues())
  List<Tuple> anchors = List.of(first, second, input)
  collector.emit(anchors, aggregate)
  collector.ack(first, second, input)
  buffered.clear()
}

这样你最终会得到一个包含3个输入元组内容的元组。

© www.soinside.com 2019 - 2024. All rights reserved.