Storm-kafka-mongoDB集成

问题描述 投票:0回答:1

我正在从Kafka生产者连续阅读500 MB随机元组,并在风暴拓扑中使用Mongo Java Driver将其插入MongoDb。问题是我的吞吐量非常低,为每秒4-5元组。

如果我写一个简单的print语句而没有DB插入,我的吞吐量为每秒684个元组。我计划从Kafka运行1M条记录,并使用mongo insert检查吞吐量。

我尝试使用kafkaconfig中的config setMaxSpoutPending,setMessageTimeoutSecs parms进行调整。

   final SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot, clientId);
    kafkaConf.ignoreZkOffsets=false;
    kafkaConf.useStartOffsetTimeIfOffsetOutOfRange=true;
    kafkaConf.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
    kafkaConf.stateUpdateIntervalMs=2000;
    kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    final TopologyBuilder topologyBuilder = new TopologyBuilder();
    topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
    topologyBuilder.setBolt("print-messages", new MyKafkaBolt()).shuffleGrouping("kafka-spout");
     Config conf = new Config();
     conf.setDebug(true);
     conf.setMaxSpoutPending(1000);
     conf.setMessageTimeoutSecs(30);

执行螺栓的方法

      JSONObject jObj = new JSONObject();
    jObj.put("key", input.getString(0));

        if (null !=jObj && jObj.size() > 0 ) {
            final DBCollection quoteCollection = dbConnect.getConnection().getCollection("stormPoc");
            if (quoteCollection != null) {
                BasicDBObject dbObject = new BasicDBObject();
                dbObject.putAll(jObj);
                quoteCollection.insert(dbObject);
            //  logger.info("inserted in Collection !!!");
            } else {
                logger.info("Error while inserting data in DB!!!");
            }
            collector.ack(input);
mongodb apache-kafka performance-testing apache-storm
1个回答
0
投票

有一个storm-mongodb模块可以与Mongo集成。它不能完成这项工作吗? https://github.com/apache/storm/tree/b07413670fa62fec077c92cb78fc711c3bda820c/external/storm-mongodb

你不应该使用storm-kafka进行Kafka集成,它已被弃用。请改用storm-kafka-client

设置conf.setDebug(true)会影响你的处理,因为Storm会记录每个元组相当大量的文本。

© www.soinside.com 2019 - 2024. All rights reserved.