我尝试了几个配置参数,甚至没有运气也使用了withLocalModeOverride
。我在这里想念的是什么?
这里是一个示例应用程序,在30秒钟后计数器将重置,并且一切都重新开始。如果可以提供其他详细信息,请告诉我。
package storm.test;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
class Test {
private static class Spout extends BaseRichSpout {
private SpoutOutputCollector spoutOutputCollector;
private long n;
@Override
public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
LOG.error("InfiniteSpout::nextTuple {}", n);
spoutOutputCollector.emit(new Values(n++));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("x"));
}
}
private static class Bolt extends BaseRichBolt {
@Override
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
}
@Override
public void execute(Tuple tuple) {
Long x = tuple.getLongByField("x");
LOG.error("Bolt::execute {}", x);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
private static final Logger LOG = LoggerFactory.getLogger(Test.class);
public static void main(String[] args) {
try {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new Spout());
builder.setBolt("bolt", new Bolt()).shuffleGrouping("spout");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
StormTopology topology = builder.createTopology();
cluster.submitTopology("test", conf, topology);
} catch (Exception e) {
LOG.error(e.getMessage());
}
}
}
这里有些东西突然出现在我身上,也许其中一些会有所帮助: