Spring-Kafka并发属性

问题描述 投票:2回答:2

我正在使用Spring-Kafka编写我的第一个Kafka Consumer。看看框架提供的不同选项,并对它们有一些疑问。如果您已经参与过,有人可以在下面说明。

问题1:根据Spring-Kafka文档,有两种方法可以实现Kafka-Consumer; “您可以通过配置MessageListenerContainer并提供消息侦听器或使用@KafkaListener注释来接收消息”。有人能告诉我何时应该选择一个选项而不是另一个?

问题2:我选择了KafkaListener方法来编写我的应用程序。为此,我需要初始化容器工厂实例,并在容器工厂内部有控制并发的选项。只想仔细检查我对并发的理解是否正确。

假设,我有一个主题名称MyTopic,里面有4个分区。为了消费来自MyTopic的消息,我启动了我的应用程序的2个实例,这些实例是通过将并发设置为2来启动的。因此,理想情况下,根据kafka分配策略,2个分区应该转到consumer1,其他2个分区应该转到consumer2 。由于并发性设置为2,每个使用者是否会启动2个线程,并且将并行使用主题中的数据?如果我们并行消费,我们也应该考虑任何事情。

问题3 - 我选择了手动ack模式,而不是在外部管理偏移(不将其保存到任何数据库/文件系统)。那么我是否需要编写自定义代码来处理重新平衡,否则框架会自动管理它?我认为没有,因为我只是在处理完所有记录后才承认。

问题4:此外,使用手动ACK模式,哪个Listener会提供更多性能? BATCH消息侦听器或普通消息侦听器。我想如果我使用普通消息监听器,则在处理每个消息后将提交偏移量。

粘贴下面的代码供您参考。

批量确认消费者:

    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment,
          Consumer<?, ?> consumer) {
      for (ConsumerRecord<String, String> record : records) {
          System.out.println("Record : " + record.value());
          // Process the message here..
          listener.addOffset(record.topic(), record.partition(), record.offset());
       }
       acknowledgment.acknowledge();
    }

初始化容器工厂:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> configs = new HashMap<String, Object>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enablAutoCommit);
    configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPolInterval);
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return configs;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    // Not sure about the impact of this property, so going with 1
    factory.setConcurrency(2);
    factory.setBatchListener(true);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    factory.getContainerProperties().setConsumerRebalanceListener(RebalanceListener.getInstance());
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setMessageListener(new BatchAckConsumer());
    return factory;
}
spring apache-kafka spring-kafka
2个回答
1
投票
  1. @KafkaListener是一个消息驱动的“POJO”,它添加了有效负载转换,参数匹配等内容。如果你实现MessageListener,你只能从Kafka获得原始的ConsumerRecord。见@KafkaListener Annotation
  2. 是的,并发表示线程数;每个线程创建一个Consumer;它们并行运行;在您的示例中,每个将获得2个分区。

如果我们并行消费,我们也应该考虑任何事情。

您的侦听器必须是线程安全的(没有共享状态或任何此类状态需要由锁保护。

  1. 目前还不清楚“处理再平衡事件”是什么意思。发生重新平衡时,框架将提交任何挂起的偏移量。
  2. 它没有任何区别;消息监听器。批量监听器只是一个偏好。即使使用MANUAL ackmode的消息监听器,也会在处理完所有轮询结果时提交偏移量。使用MANUAL_IMMEDIATE模式,可以逐个提交偏移量。

1
投票

Q1:

从文档中,

@KafkaListener注释用于将bean方法指定为侦听器容器的侦听器。该bean包含在MessagingMessageListenerAdapter中,该MessagingMessageListenerAdapter配置有各种功能,例如转换器以在必要时转换数据以匹配方法参数。

您可以使用“#{...}或属性占位符($ {...})使用SpEL在注释上配置大多数属性。有关详细信息,请参阅Javadoc。”

这种方法对于简单的POJO侦听器很有用,您不需要实现任何接口。您还可以使用注释以声明方式侦听任何主题和分区。您还可以返回收到的值,而在MessageListener的情况下,您将受到接口签名的约束。

Q2:

理想的是。如果您要使用多个主题,则会变得更复杂。默认情况下,Kafka使用RangeAssignor,它有自己的行为(你可以改变它 - 请参阅更多细节under)。

QA:

如果您的消费者死亡,将会重新平衡。如果您手动确认并且您的消费者在提交抵消之前死亡,那么您不需要做任何事情,Kafka会处理这个问题。但你可能会得到一些重复的消息(至少一次)

Q4:

这取决于你的“表现”是什么意思。如果您的意思是延迟,那么尽可能快地消耗每条记录将是最佳选择。如果要实现高吞吐量,则批量消耗更有效。

我使用Spring kafka和各种听众编写了一些示例 - 请查看this repo

© www.soinside.com 2019 - 2024. All rights reserved.