在Spring-Kafka中哪个方法和onPartitionsRevoked一样?

问题描述 投票:0回答:1

我知道在Spring-Kafka我们有以下方法:

void registerSeekCallback(ConsumerSeekCallback回调);

void onPartitionsAssigned(Map assignment,ConsumerSeekCallback callback);

void onIdleContainer(Map assignment,ConsumerSeekCallback回调);

但是它与AfterPartitionsRevoked中的本地ConsumerRebalanceListener方法做了同样的事情?

“此方法将在重新平衡操作开始之前和消费者停止获取数据之后调用。建议在此回调中将偏移提交到Kafka或自定义偏移存储以防止重复数据。”

如果我想实现ConsumerRebalanceListener,我如何传递KafkaConsumer引用?我只看到Spring-Kafka的消费者。

=========更新======

嗨,Gary,当我将RebalanceListener添加到ContainerProperties中时。我可以看到两种方法都被触发了。但是,我得到了异常,说“无法完成提交,因为该组已经重新平衡并将分区分配给另一个成员。这意味着后续调用poll()之间的时间比配置的max.poll长。 interval.ms,这通常意味着轮询循环花费了太多时间消息处理“你有什么想法吗?

===========更新2 ============

    public ConcurrentMessageListenerContainer<Integer, String> createContainer(
      ContainerProperties containerProps, IKafkaConsumer iKafkaConsumer) {

    Map<String, Object> props = consumerProps();

    DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);

    **RebalanceListner rebalanceListner = new RebalanceListner(cf.createConsumer());**

    CustomKafkaMessageListener ckml = new CustomKafkaMessageListener(iKafkaConsumer, rebalanceListner);

    CustomRecordFilter cff = new CustomRecordFilter();

    FilteringAcknowledgingMessageListenerAdapter faml = new FilteringAcknowledgingMessageListenerAdapter(ckml, cff, true);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(5);

    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(1500); // 1.5 seconds

    RetryTemplate rt = new RetryTemplate();
    rt.setBackOffPolicy(backOffPolicy);
    rt.setRetryPolicy(retryPolicy);
    rt.registerListener(ckml);
    RetryingAcknowledgingMessageListenerAdapter rml = new RetryingAcknowledgingMessageListenerAdapter(faml, rt);

    containerProps.setConsumerRebalanceListener(rebalanceListner);
    containerProps.setMessageListener(rml);
    containerProps.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    containerProps.setErrorHandler(ckml);
    containerProps.setAckOnError(false);
    ConcurrentMessageListenerContainer<Integer, String> container = new ConcurrentMessageListenerContainer<>(
        cf, containerProps);

    container.setConcurrency(1);
    return container;
  }
apache-kafka spring-kafka
1个回答
1
投票

您可以将RebalanceListener添加到传递给构造函数的容器的ContainerProperties中。

© www.soinside.com 2019 - 2024. All rights reserved.