为什么我的Apache Storm 2.0拓扑在30秒后重新启动?

问题描述 投票:0回答:1

我尝试了几个配置参数,甚至没有运气也使用了withLocalModeOverride。我在这里想念的是什么?

这里是一个示例应用程序,在30秒钟后计数器将重置,并且一切都重新开始。如果可以提供其他详细信息,请告诉我。

package storm.test;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

class Test {
    private static class Spout extends BaseRichSpout {
        private SpoutOutputCollector spoutOutputCollector;
        private long n;

        @Override
        public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.spoutOutputCollector = spoutOutputCollector;
        }

        @Override
        public void nextTuple() {
            LOG.error("InfiniteSpout::nextTuple {}", n);
            spoutOutputCollector.emit(new Values(n++));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("x"));
        }
    }

    private static class Bolt extends BaseRichBolt {
        @Override
        public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {

        }

        @Override
        public void execute(Tuple tuple) {
            Long x = tuple.getLongByField("x");
            LOG.error("Bolt::execute {}", x);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        }
    }

    private static final Logger LOG = LoggerFactory.getLogger(Test.class);

    public static void main(String[] args) {
        try {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("spout", new Spout());
            builder.setBolt("bolt", new Bolt()).shuffleGrouping("spout");

            Config conf = new Config();
            LocalCluster cluster = new LocalCluster();
            StormTopology topology = builder.createTopology();
            cluster.submitTopology("test", conf, topology);
        } catch (Exception e) {
            LOG.error(e.getMessage());
        }
    }
}
java apache-storm
1个回答
0
投票

这里有些东西突然出现在我身上,也许其中一些会有所帮助:

© www.soinside.com 2019 - 2024. All rights reserved.