动态节流Apache喷口

问题描述 投票:0回答:1

我有一个拓扑结构,其中spout从Kafka读取数据并发送到bolt,然后依次调用REST API(A)和另一个REST API(B)。到目前为止,API B还没有节流。现在,他们已经实现了节流(每时钟分钟x最大呼叫数)。

我们需要实现节流处理程序。

选项A

起初,我们考虑在REST API(A)级别进行操作,并放置一个

Thread.sleep(x in millis)一旦调用被REST API(B)限制

但是这将使所有的REST(A)呼叫等待那么长的时间(这将在1秒到59秒之间变化,并且可能会增加新呼叫的负载。

选项B

REST API(A)将有关节流的响应发送回Bolt。螺栓将处理失败通知Spout到

  • 不更改这些消息的偏移量
  • 告诉spout停止从Kafka读取信息并停止向Bolt发送消息。
  • Spout等待一段时间(例如一分钟),然后从它离开的地方恢复

选项A可以直接实施,但我认为不是一个好的解决方案。

[试图弄清选项B是否可用于topology.max.spout.pending,但是如何动态地与Storm进行通信以限制节流。任何人都可以对这个选项分享一些想法。

选项C

REST API(B)限制来自REST(A)的调用,该调用将不处理该调用,但会将429响应代码发送到螺栓。螺栓会将消息重新排队到另一个风暴拓扑的错误主题部分。此消息可以包含重试计数,如果同一消息再次受到限制,我们可以使用++ retry count重新进行排队。

更新帖子,找到使选项B可行的解决方案。

选项D

https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java

/**
 * The time stamp of the next retry is scheduled according to the exponential backoff formula (geometric progression):
 * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1),
 * where failCount = 1, 2, 3, ... nextRetry = Min(nextRetry, currentTime + maxDelay).
 * <p/>
 * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the
 * previous polled records in favor of processing more records.
 *
 * @param initialDelay      initial delay of the first retry
 * @param delayPeriod       the time interval that is the ratio of the exponential backoff formula (geometric progression)
 * @param maxRetries        maximum number of times a tuple is retried before being acked and scheduled for commit
 * @param maxDelay          maximum amount of time waiting before retrying
 *
 */
public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
    this.initialDelay = initialDelay;
    this.delayPeriod = delayPeriod;
    this.maxRetries = maxRetries;
    this.maxDelay = maxDelay;
    LOG.debug("Instantiated {}", this.toStringImpl());
}

步骤如下:

  1. 使用上述构造函数创建kafkaSpoutRetryService
  2. 使用以下选项将重试设置为KafkaSpoutConfigKafkaSpoutConfig.builder(kafkaBootStrapServers, topic).setRetry(kafkaSpoutRetryService)
  3. 如果使用以下方法在Rest API(B)中出现节流,则将螺栓失败collector.fail(tuple),它将根据第1步和第2步中的重试配置设置发出信号,通知spout再次处理元组。

我有一个拓扑结构,其中spout从Kafka读取数据并发送到bolt,然后依次调用REST API(A)和另一个REST API(B)。到目前为止,API B还没有节流。现在他们有了...

timeout apache-storm throttling
1个回答
0
投票

您的选项D听起来不错,但是为了避免在对API A的调用中出现重复,我认为您应该考虑将拓扑分为两部分。

© www.soinside.com 2019 - 2024. All rights reserved.