我使用风暴V1.2.1。根据官方文档设置后,我想在spout中获得一些指标,spout代码如下,但是在graphite-web中没有预期的指标数据。
问题1:如何正确使用新指标报告API?
问题2:如何使用Storm的Old或New Metrics API在风暴绑定的KafkaSpout中获取ACK编号指标?
在spout中使用New API来获取元组的数量:
public static class MyTestWordSpout extends BaseRichSpout {
public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
boolean _isDistributed;
SpoutOutputCollector _collector;
private Counter tupleCounter;
transient CountMetric ackcountMetric;
long msid=0;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
this.tupleCounter = context.registerCounter("tupleCount");
ackcountMetric = new CountMetric();
context.registerMetric("ack_count", ackcountMetric, 5);
}
public void close() {
}
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word),msid++);
this.tupleCounter.inc();
}
public void ack(Object msgId) {
ackcountMetric.incr();
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
storm.yaml:
storm.metrics.reporters:
# Graphite Reporter
- class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter"
daemons:
- "supervisor"
- "nimbus"
- "worker"
report.period: 1
report.period.units: "SECONDS"
graphite.host: "10.11.6.79"
graphite.port: 2003
- class: "org.apache.storm.metrics2.reporters.ConsoleStormReporter"
daemons:
- "worker"
report.period: 1
report.period.units: "SECONDS"
石墨浏览器:graphite browser
你可以使用这个库https://github.com/staslev/storm-metrics-reporter。将其添加到您的pom.xml中
<dependency>
<groupId>com.github.staslev</groupId>
<artifactId>storm-metrics-reporter</artifactId>
<version>1.5.0</version>
</dependency>
将此配置添加到拓扑中:
config.put(YammerFacadeMetric.FACADE_METRIC_TIME_BUCKET_IN_SEC, 30);
config.put(SimpleGraphiteStormMetricProcessor.GRAPHITE_HOST, "127.0.0.1");
config.put(SimpleGraphiteStormMetricProcessor.GRAPHITE_PORT, 2003);
config.put(SimpleGraphiteStormMetricProcessor.REPORT_PERIOD_IN_SEC, 10);
config.put(Config.TOPOLOGY_NAME, YOUR-TOPOLOGY.class.getCanonicalName());
config.registerMetricsConsumer(MetricReporter.class,
new MetricReporterConfig(".*", SimpleGraphiteStormMetricProcessor.class.getCanonicalName()), 1);
并将以下调用添加到螺栓的prepare
方法中:
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
StormYammerMetricsAdapter.configure(stormConf, context, new MetricsRegistry());