我遇到以下情况:
问题是,可视化螺栓始终是相同的,但它会为每种类型的螺栓发送一条带有不同标题的消息,这些标题可以作为其输入。例如:
我的问题是,我应该创建3个独立的实例,其中每个实例都有一个线程,例如
builder.setBolt("forSum", new VisualizationBolt(),1).globalGrouping("bolt-sum");
builder.setBolt("forDif", new VisualizationBolt(),1).globalGrouping("bolt-dif");
builder.setBolt("forMul", new VisualizationBolt(),1).globalGrouping("bolt-mul");
或者我应该做以下事情
builder.setBolt("forAll", new VisualizationBolt(),3)
.fieldsGrouping("forSum", new Fields("type"))
.fieldsGrouping("forDif", new Fields("type"))
.fieldsGrouping("forMul", new Fields("type"));
并从之前的每个螺栓中发出类型,因此可以根据它进行分组?
有什么好处?
另外,我是否应该期望每次bolt-sum将转到第一个可视化螺栓,bolt-dif将转到第二个可视化螺栓,bolt-mul将转到第三个可视化螺栓?他们不会混在一起吗?
我认为应该是这种情况,但目前还不是我的实现,所以我不确定这是一个错误还是我错过了什么?
使用三个实例的第一种方法是正确的方法。使用fieldsGrouping
不能确保“sum”值转到“Sum-Visualization-Bolt”,并且sum / diff / mul值都不相同(即,在不同的bolt实例中)。
fieldGrouping
的语义更加宽松:它只保证相同类型的所有元组都将由单个螺栓实例处理,即,永远不会是这种情况,两个不同的螺栓实例获得相同的类型。
我想你可以使用Partial Key分组(partialKeyGrouping
)。在Storm documentation about stream groups上说:
部分密钥分组:流按分组中指定的字段进行分区,如字段分组,但在两个下游螺栓之间进行负载平衡,这可在传入数据偏斜时提供更好的资源利用率。本文对其工作原理及其提供的优势进行了很好的解释。
我使用这种分组实现了一个简单的拓扑结构,与fieldsGrouping
相比,Graphite服务器上的图表显示出更好的负载平衡。完整的源代码是here。
topologyBuilder.setBolt(MqttSensors.BOLT_SENSOR_TYPE.getValue(), new SensorAggregateValuesWindowBolt().withTumblingWindow(Duration.seconds(5)), 2)
// .fieldsGrouping(MqttSensors.SPOUT_STATION_01.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
// .fieldsGrouping(MqttSensors.SPOUT_STATION_02.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
.partialKeyGrouping(MqttSensors.SPOUT_STATION_01.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
.partialKeyGrouping(MqttSensors.SPOUT_STATION_02.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
.setNumTasks(4) // This will create 4 Bolt instances
.addConfiguration(TagSite.SITE.getValue(), TagSite.EDGE.getValue())
;