Spout完成后杀死Storm拓扑

问题描述 投票:0回答:1

我创建了一个带有Spout的Storm拓扑,它发出了许多用于基准测试的元组。一旦从spout发出所有元组或者拓扑中不再有任何元组流动,我想停止/终止我的拓扑。

这是我的拓扑结构。

LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
//Disabled ACK'ing for higher throughput
conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0); 

LoadGeneratorSource loadGenerator = new LoadGeneratorSource(runtime,numberOfTuplesToBeEmitted);
builder.setSpout("loadGenerator", loadGenerator);

//Some Bolts Here

while (loadGenerator.isRunning()){
//Active Waiting
}
//DO SOME STUFF WITH JAVA
cluster.killTopology("StormBenchmarkTopology");

问题是我在这个范围内引用的loadGenerator实例与在spout线程中运行的实例不同。因此,isRuning()总是返回true,即使在spout线程内部,当没有更多的元组要发出时,它的值为false。

这是LoadGeneratorSource类的一部分。


public class LoadGeneratorSource extends BaseRichSpout {

    private final int throughput;
    private boolean running;
    private final long runtime;


    public LoadGeneratorSource(long runtime,int throughput) {
        this.throughput = throughput;
        this.runtime = runtime;
    }

    @Override
    public void nextTuple() {
        ThroughputStatistics.getInstance().pause(false);

        long endTime = System.currentTimeMillis() + runtime;
        while (running) {
            long startTs = System.currentTimeMillis();

            for (int i = 0; i < throughput; i++) {
                try {
                    emitValue(readNextTuple());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            while (System.currentTimeMillis() < startTs + 1000) {
                // active waiting
            }

            if (endTime <= System.currentTimeMillis())
                setRunning(false);
        }
    }

    public boolean isRunning() {
        return running;
    }

    public void setRunning(boolean running) {
        this.running = running;
    }

    //MORE STUFF

}

一旦没有更多元组从喷口发出或在拓扑中流动,有人能告诉我一种方法来停止我的拓扑吗?感谢您的帮助。

apache-storm apache-storm-topology
1个回答
0
投票

这似乎是Killing storm topology from spout的重复。请尝试那里给出的答案。

只是为了快速总结一下;您尝试这样做的方式不起作用,但您可以使用喷口中的NimbusClient来请求Nimbus终止您的拓扑。一方面的好处是,一旦部署到真正的集群,它也将起作用。

© www.soinside.com 2019 - 2024. All rights reserved.