如何从集合中有效地向Kafka生成消息

问题描述 投票:1回答:1

在我的Scala(2.11)流应用程序中,我正在使用IBM MQ中一个队列中的数据,并将其写入具有一个分区的Kafka主题。在使用了来自MQ的数据之后,消息有效负载被拆分为3000个较小的消息,这些消息存储在字符串序列中。然后,使用KafkaProducer将这3000条消息中的每条消息发送到Kafka(2.x版)。

您将如何发送这3000条消息?

我既不能增加IBM MQ中的队列数(不受我的控制),也不能增加主题中的分区数(消息的顺序是必需的,编写自定义分区程序将影响该主题的太多使用者)。

生产者设置当前为:

  • acks = 1
  • linger.ms = 0
  • batch.size = 65536

但是优化它们可能只是一个问题,而不是我当前问题的一部分。

当前正在执行

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

private lazy val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](someProperties)
val messages: Seq[String] = Seq(String1, …, String3000)
for (msg <- messages) {
    val future = kafkaProducer.send(new ProducerRecord[String, String](someTopic, someKey, msg))
    val recordMetadata = future.get()
}

对我来说,这似乎不是最优雅,最有效的方法。是否有编程方式来提高吞吐量?


从@radai回答后编辑

感谢答案使我指向正确的方向,因此我仔细研究了不同的Producer方法。 《 Kafka-权威指南》一书列出了这些方法:

一劳永逸]我们会向服务器发送一条消息,并不关心消息是否成功到达。大多数时候,它会成功到达,因为Kafka的可用性很高,并且生产者将重试自动发送消息。但是,使用此方法会丢失一些消息。

同步发送

我们发送一条消息,send()方法返回一个Future对象,然后使用get()等待将来,看看send()是否成功。

异步发送

我们使用回调函数调用send()方法,该函数会在其回调时触发收到Kafka经纪人的回复

现在我的代码看起来像这样(省去了错误处理和Callback类的定义):

  val asyncProducer = new KafkaProducer[String, String](someProperties)

  for (msg <- messages) {
    val record = new ProducerRecord[String, String](someTopic, someKey, msg)
    asyncProducer.send(record, new compareProducerCallback)
  }
  asyncProducer.flush()

我已经比较了10000条非常小的消息的所有方法。这是我的测量结果:

  1. 即发即弃:173683464ns

  2. 同步发送:29195039875ns

  3. 异步发送:44153826ns

  4. 老实说,通过选择正确的属性(batch.size,linger.ms等),可能有更多的潜力来优化所有属性。

在我的Scala(2.11)流应用程序中,我正在使用IBM MQ中一个队列中的数据,并将其写入具有一个分区的Kafka主题。从MQ消耗数据后,消息有效负载就会得到...

scala apache-kafka kafka-producer-api
1个回答
1
投票

我能看到您的代码运行缓慢的最大原因是,您在等待每个发送将来。

© www.soinside.com 2019 - 2024. All rights reserved.