为什么当我将元组发送到ElasticSearch时,我的风暴拓扑没有确认

问题描述 投票:0回答:1

[我刚开始使用Storm,我刚刚开始了Data Architect培训课程,正是在这种情况下,我正面临着今天带给我的问题。

我正在通过名为CurrentPriceSpout的KafkaSpout从kakfa接收消息。到目前为止,一切正常。然后,在CurrentPriceBolt中,我重新发布了一个元组,以便使用EsCurrentPriceBolt将数据写入ElasticSearch。问题在这里。我无法将数据直接写入ElasticSearch,只有在删除拓扑时才写入数据。

是否有一个Storm参数可以通过检索确认来强制写入元组?

我尝试通过添加选项“ .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,5)”,元组在ElasticSearch中写得很好,但未被确认。因此,Storm会无限期地重写它们。

感谢您的帮助蒂埃里

maven elasticsearch apache-kafka apache-storm
1个回答
0
投票

我设法找到问题的答案。主要问题在于ES的设计目的不是要吸收研究项目中生成的尽可能少的数据。默认情况下,ES会分批写入1000个条目的数据。通过此项目,我每30秒生成一个数据,或者每500分钟(或8h20)生成1000个批处理。

所以我详细检查了拓扑的配置,并使用以下选项:

  • es.batch.size.entries:1
  • es.storm.bolt.flush.entries.size:1
  • topology.producer.batch.size:1
  • topology.transfer.batch.size:1

现在它像这样:

...
...

public class App 
{
    ...    
    ...    

    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException
    {
        ...
        ...

        StormTopology topology  = topologyBuilder.createTopology();                 // je crée ma topologie Storm
        String topologyName     = properties.getProperty("storm.topology.name");    // je nomme ma topologie
        StormSubmitter.submitTopology(topologyName, getTopologyConfig(properties), topology);               // je démarre ma topologie sur mon cluster storm
        System.out.println( "Topology on remote cluster : Started!" );              
    }


    private static Config getTopologyConfig(Properties properties)
    {
        Config stormConfig = new Config();
        stormConfig.put("topology.workers",                 Integer.parseInt(properties.getProperty("topology.workers")));
        stormConfig.put("topology.enable.message.timeouts", Boolean.parseBoolean(properties.getProperty("topology.enable.message.timeouts")));
        stormConfig.put("topology.message.timeout.secs",    Integer.parseInt(properties.getProperty("topology.message.timeout.secs")));
        stormConfig.put("topology.transfer.batch.size",     Integer.parseInt(properties.getProperty("topology.transfer.batch.size")));
        stormConfig.put("topology.producer.batch.size",     Integer.parseInt(properties.getProperty("topology.producer.batch.size")));      
        return stormConfig;
    }

    ...    
    ...    
    ...    
}

现在可以使用了!!

© www.soinside.com 2019 - 2024. All rights reserved.