我是Apache Storm的新手。
我正在尝试使用Apache Kafka,Storm和ESPER CEP引擎开发实时流处理系统。
为此,我有一个KafkaSpout将向Bolts(具有我的CEP查询)发送流来过滤流。
我已经创建了一个拓扑,我试图在本地集群上运行它
问题是在我的螺栓中运行的CEP查询需要批处理元组来对流执行窗口操作。在我的拓扑结构中,KafkaSpout一次只向Bolts发送一个元组进行处理。所以我的CEP查询没有按预期工作。
我在Storm中使用默认的KafkaSpout。有什么方法可以一次向螺栓发送多个不同的元组吗?一些配置调整可以做到这一点,还是我需要为此自定义KafkaSpout?
请帮忙!!
我的拓扑:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(“KafkaSpout”,新的KafkaSpout <>(KafkaSpoutConfig.builder(“localhost:”+ 9092,“weatherdata”)。setProp(ConsumerConfig.GROUP_ID_CONFIG,“weather-consumer-group”)。build()),4 );
builder.setBolt(“A”,new FeatureSelectionBolt(),2).globalGrouping(“KafkaSpout”);
builder.setBolt(“B”,new TrendDetectionBolt(),2).shuffleGrouping(“A”)
我正在使用2个螺栓和一个喷口。
我在博尔特A中运行的esper Query是
从weatherEvent.win:length(3)中选择第一个(e),最后一个(e)作为e
在这里,我试图从事件流中获取长度为3的窗口中的第一个和最后一个事件。但是我得到了同样的第一个和最后一个事件,因为KafkaSpout一次只发送一个元组。
喷口不能这样做,但你可以使用Storm的窗口支持https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html,或者只是写一个聚合螺栓并将它放在喷口和拓扑的其余部分之间。
所以你的拓扑应该是spout -> aggregator -> feature selection -> trend detection
。
我建议你试试内置的窗口支持,但如果你想编写自己的聚合,你的螺栓实际上只需要接收一些元组(例如3),并发出一个包含所有值的新元组。
聚合器螺栓应该做类似的事情
private List<Tuple> buffered;
execute(Tuple input) {
if (buffered.size != 2) {
buffered.add(input)
return
}
Tuple first = buffered.get(0)
Tuple second = buffered.get(1)
Values aggregate = new Values(first.getValues(), second.getValues(), input.getValues())
List<Tuple> anchors = List.of(first, second, input)
collector.emit(anchors, aggregate)
collector.ack(first, second, input)
buffered.clear()
}
这样你最终会得到一个包含3个输入元组内容的元组。