风暴中的背压

问题描述 投票:0回答:2

在我们的 storm 1.0.2 应用程序中,我们面临内存不足异常。在调试时,我们看到 Kafka spout 向螺栓发出了太多消息。螺栓以接近 4.0 的容量运行。那么有没有一种方法可以在风暴中启用背压,以便喷口根据螺栓的容量排放。尝试启用 topology.backpressure.enable 为 true 但遇到了这个问题 https://issues.apache.org/jira/browse/STORM-1949。我们正在使用开箱即用的 KafkaSpout 实现,并为我们的螺栓扩展 BaseRichBolt。我们的 DAG 是线性的。

apache apache-storm
2个回答
7
投票

可以通过在拓扑配置中设置maxSpoutPending值来处理KafkaSpout的背压,

Config config = new Config();
config.setMaxSpoutPending(200); 
config.setMessageTimeoutSecs(100);

StormSubmitter.submitTopology("testtopology", config, builder.createTopology());

maxSpoutPending 是给定时间拓扑中可以等待确认的元组数。设置此属性,将通知 KafkaSpout 不再使用来自 Kafka 的任何数据,除非未确认的元组计数小于 maxSpoutPending 值。


0
投票

看这条评论:

/** * 指定在使用 {@link ProcessingGuarantee} 时 spout 是否应该要求 Storm 跟踪发出的元组 * {@link ProcessingGuarantee#AT_LEAST_ONCE}。在提供至少一次保证时,spout 将始终跟踪发出的元组 * 无论此设置如何。默认情况下此设置为 false。 * *

即使在可靠性不是问题的情况下,启用跟踪也很有用,因为它允许 * {@link Config#TOPOLOGY_MAX_SPOUT_PENDING} 产生效果,并启用一些喷口指标(例如完整延迟) *否则将被禁用。 * * @param tupleTrackingEnforced 如果 Storm 应该跟踪发出的元组,则为 true,否则为 false */


我认为这个配置可以帮助:

成分:

  • id: "kafkaRecordTranslator" 类名:“ir.zarebin.fse.stormcrawler.spout.KafkaRecordTranslator”

  • id: "spoutConfigBuilder"

    类名:“org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder” 构造函数参数:

    • ${ENV-KAFKA_ADDRESS}
    • [${ENV-KAFKA_TOPIC_NAME}]

    属性:

    • 名称:“处理保证” 值:AT_LEAST_ONCE
    • name: "recordTranslator" 参考:“kafkaRecordTranslator”
    • 名称:“tupleTrackingEnforced” 价值:真
© www.soinside.com 2019 - 2024. All rights reserved.