针对高流量 kafka 主题扩展 karafka 消费者实例

问题描述 投票:0回答:1

我正在使用 Rails 和 Karafka gem。在其中一个主题中,流量会很大,因此我向该主题添加了 10 个分区,还将 Karafka::App.config.concurrency 设置为 10。但这并没有给我带来巨大的时间差异。为了进一步扩展我们的消费者处理,我正在考虑在我们的暂存环境中添加单独的消费者实例。

我可以在 Karafka 中添加单独的消费者实例以进一步减少处理时间吗?

Kaafka gem 版本:2.1.11,Rails 版本:7.0.5

karafka.rb 文件看起来像:

class KarafkaApp < Karafka::App
  kafka_config = Settings.kafka
  max_payload_size = 7_000_000 # Setting max payload size as 7MB

  t1_topic = kafka_config['topics']['t1_topic']
  t2_topic = kafka_config['topics']['t2_topic']

  setup do |config|
    config.kafka = {
      'bootstrap.servers': ENV['KAFKA_BROKERS_SCRAM'],
      'max.poll.interval.ms': 1200000
    }
    config.client_id = kafka_config['client_id']
    config.concurrency = 10
    # Recreate consumers with each batch. This will allow Rails code reload to work in the
    # development mode. Otherwise Karafka process would not be aware of code changes
    config.consumer_persistence = !Rails.env.development?

    config.producer = ::WaterDrop::Producer.new do |producer_config|
      # Use all the settings already defined for consumer by default
      producer_config.kafka = ::Karafka::Setup::AttributesMap.producer(config.kafka.dup)

      # Alter things you want to alter
      producer_config.max_payload_size = max_payload_size
      producer_config.kafka[:'message.max.bytes'] = max_payload_size
    end
  end

  routes.draw do
    topic t1_topic.to_sym do
      consumer C1Consumer
    end

    topic t2_topic.to_sym do
      config(
        partitions: Karafka::App.config.concurrency
      )

      consumer C2Consumer
    end
  end
end
ruby-on-rails parallel-processing producer-consumer karafka
1个回答
2
投票

这里是 Karafka 作者。

您的描述需要包含一些关键信息,尤其是关于您的数据和处理性质的信息。如果没有这些,就不容易提供具体的建议,但我会尽力而为。

首先,请注意,在您所描述的情况下,在没有充分利用 CPU 的情况下,大量基于 IO 的操作可能只能实现每个进程十倍(或类似)的增长。例如,每秒处理一条消息意味着每秒处理 10 条消息的最佳情况,即使一切都按预期进行。有一些方法可以进一步增加这个,我将在下面给你一些解释。

了解 Karafka 的并行处理和排序保证至关重要。为了防止能力下降,并行数据轮询/接收和并行工作分发需要同步。

Kaafka 在最佳条件下每秒可以处理多达 100k 条消息。

另一点是 Karafka 如何在单个进程中管理多个分区。默认情况下,它通过将数据循环分区到单个队列中来减少 Kafka 连接数量。每个分区过多的预缓冲可能会阻碍并行处理,因为队列中的单个批次可能无法从多个分区提供足够的数据来实现有效的并行性。这种误解常常是由于与 Sidekiq 的比较而产生的。然而,Karafka 提供了几种缓解策略。

关于添加 10 个分区并将

Karafka::App.config.concurrency
设置为 10 而没有显着改善,如果预缓冲范围广泛,则这是预期的。在这种情况下,Kaafka 无法有效地利用所有线程,尤其是在处理积压工作时。这可能会随着更小的滞后而改变,从而导致更好的数据分布和增加的并行性。

在 Karafka 中添加单独的消费者实例确实可以减少处理时间。 Karafka 支持各种方法来增强并行性和性能,具体取决于您的工作性质、主题结构和数量。以下是一些选项:

Kaafka 的并发模型在这里得到了彻底的解释:https://karafka.io/docs/Concurrency-and-Multithreading/

此概述将为您提供增强 Karafka 设置的更清晰路径。

附注我们有一个 Slack,您可以从我和其他 Karafka 重度用户那里获得实时答案。人们以每秒数百条或数千条消息的规模使用它,并且在正确配置的情况下运行良好。

附注2. 如果您计划在某个时候将您的业务押在 Karafka 上,那么可能值得投资 Pro 产品,其中包括特定于应用程序的支持和架构咨询以及许多有用的功能。

© www.soinside.com 2019 - 2024. All rights reserved.