提高 Kafka Producer 性能:使用多个 Kafka 模板与具有动态主题的单个模板

问题描述 投票:0回答:1

我正在开发一个应用程序,需要将记录发送到同一 Kafka 集群中的不同主题。我已经探索了两种方法来实现这一目标,但我不确定它们对性能的影响。

具有动态主题的单个 Kafka 模板

我目前在应用程序中使用单个 Kafka 模板,在发送记录时动态指定主题。这是我的代码的简化版本:

class ProducerService {
    @Autowired
    private KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate;

    public void send(String topic, GenericRecord key, GenericRecord value) {
        ListenableFuture<SendResult<GenericRecord, GenericRecord>> future = kafkaTemplate.send(topic, key, value);
    }
}

这种方法允许我通过动态传递主题来为多个主题重用相同的 Kafka 模板。但是,我担心它对性能的影响。

针对不同主题的多个 Kafka 模板:

或者,我正在考虑为每个主题使用单独的 Kafka 模板。这是我的实现方式
@Configuration
public class KafkaConfig {

    // Define topics
    @Value("${kafka.topic.first}")
    private String firstTopic;

    @Value("${kafka.topic.second}")
    private String secondTopic;

    @Bean(name = "firstKafkaTemplate")
    public KafkaTemplate<GenericRecord, GenericRecord> firstKafkaTemplate(ProducerFactory<GenericRecord, GenericRecord> defaultKafkaProducerFactory) {
        KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate = new KafkaTemplate<>(defaultKafkaProducerFactory);
        kafkaTemplate.setDefaultTopic(firstTopic);
        return kafkaTemplate;
    }

    // Second Kafka template bean definition follows similarly
}

class ProducerService {
    @Autowired
    @Qualifier("firstKafkaTemplate")
    private KafkaTemplate<GenericRecord, GenericRecord> firstTopicTemplate;

    // Second Kafka template injection follows similarly

    public void send(String topic, GenericRecord key, GenericRecord value) {
        ListenableFuture<SendResult<GenericRecord, GenericRecord>> future;
        if ("first".equalsIgnoreCase(topic)) {
            future = firstTopicTemplate.sendDefault(key, value);
        } else if ("second".equalsIgnoreCase(topic)) {
            future = secondTopicTemplate.sendDefault(key, value);
        } else {
            throw new RuntimeException("Topic is not configured");
        }
    }
}

通过此设置,每个主题都有自己专用的 Kafka 模板。与动态主题方法相比,这种方法会产生更好的性能吗?

我了解Kafka内部执行批处理并通过单独的线程将批次发送到Kafka。考虑到这一点,哪种方法在性能方面会更有效?或者两种方法之间的性能差异可以忽略不计?

java spring apache-kafka kafka-producer-api
1个回答
0
投票

我根据吞吐量回答我自己的问题。当我处理记录时,我遇到了超时问题。

单一生产者在大多数情况下都是高效的

如果您因排队记录的速度比发送记录的速度快得多而遇到任何超时问题。然后调整以下参数以消除超时问题。请注意,这里我添加了虚拟值。您必须测试您的应用程序以获得应用程序所需的值。
spring.kafka.producer.properties.[linger.ms]=100
spring.kafka.producer.properties.[batch.size]=100000
spring.kafka.producer.properties.[request.timeout.ms]=30000
spring.kafka.producer.properties.[delivery.timeout.ms]=200000
请求超时.ms
生产者发送数据时等待服务器回复的时间由该参数控制。如果超时而没有回复,生产者将重试发送或响应错误(通过异常或发送回调)。
linger.ms
linger.ms 控制发送当前批次之前等待其他消息的时间量。当当前批次已满或达到 linger.ms 限制时,Kafka 生产者会发送一批消息。默认情况下,一旦有发送者线程可用于发送消息,生产者就会立即发送消息,即使批次中只有一条消息。通过将 linger.ms 设置为大于 0,我们指示生产者等待几毫秒以将其他消息添加到批次中,然后再将其发送到代理。这会增加延迟,但也会增加吞吐量(因为我们一次发送更多消息,每条消息的开销更少)。
批量大小
当多个记录被发送到同一个分区时,生产者会将它们一起批处理。此参数控制每个批次将使用的内存量(以字节为单位)(不是消息!)。当批次满时,该批次中的所有消息都会被发送。然而,这并不意味着生产者将等待批次变满。生产者将发送半满批次,甚至只发送一条消息的批次。因此,batch size设置太大不会导致消息发送延迟;它只会为批次使用更多内存。将批量大小设置得太小会增加一些开销,因为生产者需要更频繁地发送消息。
交付.超时.ms
调用 send() 返回后报告成功或失败的时间上限。这限制了记录在发送之前延迟的总时间、等待代理确认的时间(如果预期)以及允许重试发送失败的时间。如果遇到不可恢复的错误、重试次数已用尽,或者记录被添加到达到较早交付到期期限的批次,则生产者可能会报告无法发送早于此配置的记录。此配置的值应大于或等于 request.timeout.ms 和 linger.ms 之和。

如果你仍然面临超时问题,那么你需要更多的生产者

通过增加相同kafka模板的线程来增加生产者

为此,当您创建生产者工厂时,您必须将以下 setProducerPerThread 启用为 True。

我添加了一个TaskExecutor来控制生产者的数量,因为生产者的数量=线程数


@Configuration
Public class Conf{

   @Bean("kafkaTaskExecutor")
    public TaskExecutor getKafkaAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(15);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setThreadNamePrefix("Kafka-Async-");
        return executor;
    }


  @Bean
    public KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate(ProducerFactory<GenericRecord, GenericRecord> producerFactory) {
        if (producerFactory instanceof DefaultKafkaProducerFactory<GenericRecord, GenericRecord> defaultFactory) {
            defaultFactory.setProducerPerThread(true);
        }
        return new KafkaTemplate<>(producerFactory);
    }
}

不要更改您的 Kafka 代码。就让它一样吧。我们将创建一个新层以使其正常工作。

class AsyncProducer{

       @Autowired
       private KafkaProducer producer;

      @Value("${topic.name}")
      private String topic;

     @Autowired
     @Qualifier("kafkaTaskExecutor")
     private TaskExecutor taskExecutor;

      public void sendAsync(GenericRecord key, GenericRecord value){
          CompletableFuture.completeFuture(value).thenAcceptAsync( val->   producer.send(topic,key,value), taskExecutor);
     }

}

通过上述设置,最初将有 5 个生产者开始发送记录,当负载较高时,将增加到 15 个生产者

使用多个 Kafka 模板

如果您认为您仍然没有达到吞吐量,那么您可以尝试增加模板的数量。但实际上,我没有尝试这个,因为我用第二种方法得到了想要的结果。
© www.soinside.com 2019 - 2024. All rights reserved.