[我刚开始使用Storm,我刚刚开始了Data Architect培训课程,正是在这种情况下,我正面临着今天带给我的问题。
我正在通过名为CurrentPriceSpout的KafkaSpout从kakfa接收消息。到目前为止,一切正常。然后,在CurrentPriceBolt中,我重新发布了一个元组,以便使用EsCurrentPriceBolt将数据写入ElasticSearch。问题在这里。我无法将数据直接写入ElasticSearch,只有在删除拓扑时才写入数据。
是否有一个Storm参数可以通过检索确认来强制写入元组?
我尝试通过添加选项“ .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,5)”,元组在ElasticSearch中写得很好,但未被确认。因此,Storm会无限期地重写它们。
感谢您的帮助蒂埃里
我设法找到问题的答案。主要问题在于ES的设计目的不是要吸收研究项目中生成的尽可能少的数据。默认情况下,ES会分批写入1000个条目的数据。通过此项目,我每30秒生成一个数据,或者每500分钟(或8h20)生成1000个批处理。
所以我详细检查了拓扑的配置,并使用以下选项:
现在它像这样:
...
...
public class App
{
...
...
public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException
{
...
...
StormTopology topology = topologyBuilder.createTopology(); // je crée ma topologie Storm
String topologyName = properties.getProperty("storm.topology.name"); // je nomme ma topologie
StormSubmitter.submitTopology(topologyName, getTopologyConfig(properties), topology); // je démarre ma topologie sur mon cluster storm
System.out.println( "Topology on remote cluster : Started!" );
}
private static Config getTopologyConfig(Properties properties)
{
Config stormConfig = new Config();
stormConfig.put("topology.workers", Integer.parseInt(properties.getProperty("topology.workers")));
stormConfig.put("topology.enable.message.timeouts", Boolean.parseBoolean(properties.getProperty("topology.enable.message.timeouts")));
stormConfig.put("topology.message.timeout.secs", Integer.parseInt(properties.getProperty("topology.message.timeout.secs")));
stormConfig.put("topology.transfer.batch.size", Integer.parseInt(properties.getProperty("topology.transfer.batch.size")));
stormConfig.put("topology.producer.batch.size", Integer.parseInt(properties.getProperty("topology.producer.batch.size")));
return stormConfig;
}
...
...
...
}
现在可以使用了!!