我正在使用 Rails 和 Karafka gem。在其中一个主题中,流量会很大,因此我向该主题添加了 10 个分区,还将 Karafka::App.config.concurrency 设置为 10。但这并没有给我带来巨大的时间差异。为了进一步扩展我们的消费者处理,我正在考虑在我们的暂存环境中添加单独的消费者实例。
我可以在 Karafka 中添加单独的消费者实例以进一步减少处理时间吗?
Kaafka gem 版本:2.1.11,Rails 版本:7.0.5
karafka.rb 文件看起来像:
class KarafkaApp < Karafka::App
kafka_config = Settings.kafka
max_payload_size = 7_000_000 # Setting max payload size as 7MB
t1_topic = kafka_config['topics']['t1_topic']
t2_topic = kafka_config['topics']['t2_topic']
setup do |config|
config.kafka = {
'bootstrap.servers': ENV['KAFKA_BROKERS_SCRAM'],
'max.poll.interval.ms': 1200000
}
config.client_id = kafka_config['client_id']
config.concurrency = 10
# Recreate consumers with each batch. This will allow Rails code reload to work in the
# development mode. Otherwise Karafka process would not be aware of code changes
config.consumer_persistence = !Rails.env.development?
config.producer = ::WaterDrop::Producer.new do |producer_config|
# Use all the settings already defined for consumer by default
producer_config.kafka = ::Karafka::Setup::AttributesMap.producer(config.kafka.dup)
# Alter things you want to alter
producer_config.max_payload_size = max_payload_size
producer_config.kafka[:'message.max.bytes'] = max_payload_size
end
end
routes.draw do
topic t1_topic.to_sym do
consumer C1Consumer
end
topic t2_topic.to_sym do
config(
partitions: Karafka::App.config.concurrency
)
consumer C2Consumer
end
end
end
这里是 Karafka 作者。
您的描述需要包含一些关键信息,尤其是关于您的数据和处理性质的信息。如果没有这些,就不容易提供具体的建议,但我会尽力而为。
首先,请注意,在您所描述的情况下,在没有充分利用 CPU 的情况下,大量基于 IO 的操作可能只能实现每个进程十倍(或类似)的增长。例如,每秒处理一条消息意味着每秒处理 10 条消息的最佳情况,即使一切都按预期进行。有一些方法可以进一步增加这个,我将在下面给你一些解释。
了解 Karafka 的并行处理和排序保证至关重要。为了防止能力下降,并行数据轮询/接收和并行工作分发需要同步。
Kaafka 在最佳条件下每秒可以处理多达 100k 条消息。
另一点是 Karafka 如何在单个进程中管理多个分区。默认情况下,它通过将数据循环分区到单个队列中来减少 Kafka 连接数量。每个分区过多的预缓冲可能会阻碍并行处理,因为队列中的单个批次可能无法从多个分区提供足够的数据来实现有效的并行性。这种误解常常是由于与 Sidekiq 的比较而产生的。然而,Karafka 提供了几种缓解策略。
关于添加 10 个分区并将
Karafka::App.config.concurrency
设置为 10 而没有显着改善,如果预缓冲范围广泛,则这是预期的。在这种情况下,Kaafka 无法有效地利用所有线程,尤其是在处理积压工作时。这可能会随着更小的滞后而改变,从而导致更好的数据分布和增加的并行性。
在 Karafka 中添加单独的消费者实例确实可以减少处理时间。 Karafka 支持各种方法来增强并行性和性能,具体取决于您的工作性质、主题结构和数量。以下是一些选项:
设置调整:一种可访问的方法是调整在https://karafka.io/docs/Librdkafka-Configuration/中找到的设置。例如,从默认的 1MB 减少
fetch.message.max.bytes
可以通过防止每个分区积累过多的数据来增加小消息的并行性。
运行 Swarm:Karafka 的 Swarm 模式,详细信息请参见 https://karafka.io/docs/Swarm-Multi-Process/,允许通过利用多个分叉和连接进行并行数据轮询。
独立订阅组:对不同主题使用多个订阅组可以改善所有主题之间的循环和排队。有关更多信息,请访问 https://karafka.io/docs/Routing/#multiple-subscription-groups-mode。
虚拟分区:这些增强了单个分区的数据处理并行性,显着提高了 IO 密集型任务的吞吐量。详细信息请参见https://karafka.io/docs/Pro-Virtual-Partitions/。
连接复用:Karafka 支持单主题、单进程设置的复用,详细描述见 https://karafka.io/docs/Pro-Multiplexing/。
长时间运行/非阻塞作业:这种方法在处理主题分区时不会阻止轮询,可以有效地循环分区。请参阅 https://karafka.io/docs/Pro-Long-Running-Jobs/ 了解限制和用例。
Kaafka 的并发模型在这里得到了彻底的解释:https://karafka.io/docs/Concurrency-and-Multithreading/。
此概述将为您提供增强 Karafka 设置的更清晰路径。
附注我们有一个 Slack,您可以从我和其他 Karafka 重度用户那里获得实时答案。人们以每秒数百条或数千条消息的规模使用它,并且在正确配置的情况下运行良好。
附注2. 如果您计划在某个时候将您的业务押在 Karafka 上,那么可能值得投资 Pro 产品,其中包括特定于应用程序的支持和架构咨询以及许多有用的功能。