风暴领域分组

问题描述 投票:3回答:2

我遇到以下情况:

  • 有许多螺栓可以计算出不同的值
  • 此值将发送到可视化螺栓
  • 可视化螺栓打开一个Web套接字并以某种方式发送值以进行可视化

问题是,可视化螺栓始终是相同的,但它会为每种类型的螺栓发送一条带有不同标题的消息,这些标题可以作为其输入。例如:

  • BoltSum计算总和
  • BoltDif计算差异
  • BoltMul计算多个
  • 所有这些螺栓都使用VisualizationBolt进行可视化
  • 在这种情况下,有三个VisualizationBolt实例

我的问题是,我应该创建3个独立的实例,其中每个实例都有一个线程,例如

builder.setBolt("forSum", new VisualizationBolt(),1).globalGrouping("bolt-sum");
builder.setBolt("forDif", new VisualizationBolt(),1).globalGrouping("bolt-dif");
builder.setBolt("forMul", new VisualizationBolt(),1).globalGrouping("bolt-mul");

或者我应该做以下事情

builder.setBolt("forAll", new VisualizationBolt(),3)
.fieldsGrouping("forSum", new Fields("type"))
.fieldsGrouping("forDif", new Fields("type"))
.fieldsGrouping("forMul", new Fields("type"));

并从之前的每个螺栓中发出类型,因此可以根据它进行分组?

有什么好处?

另外,我是否应该期望每次bolt-sum将转到第一个可视化螺栓,bolt-dif将转到第二个可视化螺栓,bolt-mul将转到第三个可视化螺栓?他们不会混在一起吗?

我认为应该是这种情况,但目前还不是我的实现,所以我不确定这是一个错误还是我错过了什么?

apache-storm
2个回答
3
投票

使用三个实例的第一种方法是正确的方法。使用fieldsGrouping不能确保“sum”值转到“Sum-Visualization-Bolt”,并且sum / diff / mul值都不相同(即,在不同的bolt实例中)。

fieldGrouping的语义更加宽松:它只保证相同类型的所有元组都将由单个螺栓实例处理,即,永远不会是这种情况,两个不同的螺栓实例获得相同的类型。


0
投票

我想你可以使用Partial Key分组(partialKeyGrouping)。在Storm documentation about stream groups上说:

部分密钥分组:流按分组中指定的字段进行分区,如字段分组,但在两个下游螺栓之间进行负载平衡,这可在传入数据偏斜时提供更好的资源利用率。本文对其工作原理及其提供的优势进行了很好的解释。

我使用这种分组实现了一个简单的拓扑结构,与fieldsGrouping相比,Graphite服务器上的图表显示出更好的负载平衡。完整的源代码是here

topologyBuilder.setBolt(MqttSensors.BOLT_SENSOR_TYPE.getValue(), new SensorAggregateValuesWindowBolt().withTumblingWindow(Duration.seconds(5)), 2)
        // .fieldsGrouping(MqttSensors.SPOUT_STATION_01.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        // .fieldsGrouping(MqttSensors.SPOUT_STATION_02.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        .partialKeyGrouping(MqttSensors.SPOUT_STATION_01.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        .partialKeyGrouping(MqttSensors.SPOUT_STATION_02.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        .setNumTasks(4) // This will create 4 Bolt instances 
        .addConfiguration(TagSite.SITE.getValue(), TagSite.EDGE.getValue())
        ;

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.