Reactor Kafka - At-Least-Once - 处理多分区中的故障和偏移

问题描述 投票:0回答:2

以下是从kafka主题(8分区)接收消息并对其进行处理的消费者代码。

    @Component
    public class MessageConsumer {

        private static final String TOPIC = "mytopic.t";
        private static final String GROUP_ID = "mygroup";
        private final ReceiverOptions consumerSettings;
        private static final Logger LOG = LoggerFactory.getLogger(MessageConsumer.class);

        @Autowired
        public MessageConsumer(@Qualifier("consumerSettings") ReceiverOptions consumerSettings)
        {
            this.consumerSettings=consumerSettings;
            consumerMessage();
        }

        private void consumerMessage()
        {

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions(Collections.singleton(TOPIC)));

        Scheduler scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);

        Flux.defer(receiver::receive)
                .groupBy(m -> m.receiverOffset().topicPartition())
                .flatMap(partitionFlux ->
                        partitionFlux.publishOn(scheduler)
                                .concatMap(m -> {
                                    LOG.info("message received from kafka : " + "key : " + m.key()+ " partition: " + m.partition());
                                    return process(m.key(), m.value())
                                            .thenEmpty(m.receiverOffset().commit());
                                }))
                .retryBackoff(5, Duration.ofSeconds(2), Duration.ofHours(2))
                .doOnError(err -> {
                    handleError(err);
                }).retry()
                .doOnCancel(() -> close()).subscribe();

    }

    private void close() {
    }

    private void handleError(Throwable err) {
        LOG.error("kafka stream error : ",err);
    }

    private Mono<Void> process(String key, String value)
    {
        if(key.equals("error"))
            return Mono.error(new Exception("process error : "));

        LOG.error("message consumed : "+key);
        return Mono.empty();
    }


    public ReceiverOptions<String, String> receiverOptions(Collection<String> topics) {
        return consumerSettings
                .commitInterval(Duration.ZERO)
                .commitBatchSize(0)
                .addAssignListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
                .addRevokeListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
                .subscription(topics);
    }


}
    @Bean(name="consumerSettings")
    public ReceiverOptions<String, String> getConsumerSettings() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put("max.block.ms", "3000");
        props.put("request.timeout.ms", "3000");

        return ReceiverOptions.create(props);
    }

收到每条消息后,如果消费的消息处理成功,我的处理逻辑将返回空单声道。

如果处理逻辑中没有返回错误,则一切都按预期工作。

但是,如果我抛出一个错误来模拟我的处理逻辑中的特定消息的异常行为,那么我缺少处理导致异常的消息。流移动到下一条消息。

我想要实现的是,处理当前消息并提交偏移量,如果成功则转移到下一条记录。

如果处理消息时出现任何异常,则不提交当前偏移量并重试相同的消息,直到成功为止。在当前消息成功之前,请勿移至下一条消息。

请告诉我如何在不跳过消息的情况下处理进程故障,并使流从抛出异常的偏移量开始。

问候,

Vinoth

apache-kafka offset spring-webflux project-reactor partition
2个回答
0
投票

创建不同的消费者组。

每个使用者组都与一个数据库相关。

创建您的消费者,以便他们只处理相关事件并将其推送到相关数据库。如果数据库已关闭,则配置使用者重试无限的时间。出于任何原因,如果您的消费者死亡,那么请确保他们从早期消费者离开的地方开始。在将数据提交到数据库并将ack发送到kafka代理之后,您的消费者很可能会死亡。您需要更新使用者代码以确保您只准确处理一次消息(如果需要)。


0
投票

以下代码适合我。我们的想法是重试失败的消息配置的时间,如果仍然失败,则将其移至失败的队列并提交消息。同时处理来自其他分区的消息。

如果来自特定分区的消息在配置的时间内失败,则在延迟后重新启动流,以便我们可以通过不连续命中它们来处理依赖性故障。

@Autowired
public ReactiveMessageConsumer(@Qualifier("consumerSettings") ReceiverOptions consumerSettings,MessageProducer producer)
{
    this.consumerSettings=consumerSettings;
    this.fraudCheckService=fraudCheckService;
    this.producer=producer;
    consumerMessage();
}

private void consumerMessage() {

    int numRetries=3;

    Scheduler scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);

    KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions(Collections.singleton(TOPIC)));

    Flux<GroupedFlux<TopicPartition, ReceiverRecord<String, String>>> f = Flux.defer(receiver::receive)
            .groupBy(m -> m.receiverOffset().topicPartition());

    Flux f1 = f.publishOn(scheduler).flatMap(r -> r.publishOn(scheduler).concatMap(b ->
            Flux.just(b)
                    .concatMap(a -> {
                        LOG.error("processing message - order: {} offset: {} partition: {}",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition());

                        return process(a.key(), a.value()).
                                then(a.receiverOffset().commit())
                                .doOnSuccess(d -> LOG.info("committing  order {}: offset: {} partition: {} ",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition()))
                                .doOnError(d -> LOG.info("committing offset failed for order {}: offset: {} partition: {} ",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition()));
                    })
                    .retryWhen(companion -> companion
                            .doOnNext(s -> LOG.info(" --> Exception processing message for order {}: offset: {} partition: {} message: {} " , b.key() , b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition(),s.getMessage()))
                            .zipWith(Flux.range(1, numRetries), (error, index) -> {
                                if (index < numRetries) {
                                    LOG.info(" --> Retying {} order: {} offset: {} partition: {} ", index, b.key(),b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition());
                                    return index;
                                } else {
                                    LOG.info(" --> Retries Exhausted: {} - order: {} offset: {} partition: {}. Message moved to error queue. Commit and proceed to next", index, b.key(),b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition());
                                    producer.sendMessages(ERROR_TOPIC,b.key(),b.value());
                                    b.receiverOffset().commit();
                                    //return index;
                                    throw Exceptions.propagate(error);
                                }
                            })
                            .flatMap(index -> Mono.delay(Duration.ofSeconds((long) Math.pow(1.5, index - 1) * 3)))
                            .doOnNext(s -> LOG.info(" --> Retried at: {} ", LocalTime.now()))
                    ))
    );

    f1.doOnError(a ->  {
                LOG.info("Moving to next message because of : ", a);
                try {

                    Thread.sleep(5000); // configurable
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

    ).retry().subscribe(); 

}

public ReceiverOptions<String, String> receiverOptions(Collection<String> topics) {
    return consumerSettings
            .commitInterval(Duration.ZERO)
            .commitBatchSize(0)
            .addAssignListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
            .addRevokeListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
            .subscription(topics);
}

private Mono<Void> process(OrderId orderId, TraceId traceId)
{
    try {

        Thread.sleep(500); // simulate slow response
    } catch (InterruptedException e) {
        // Causes the restart
        e.printStackTrace();
    }

   if(orderId.getId().startsWith("error")) // simulate error scenario
        return Mono.error(new Exception("processing message failed for order: " + orderId.getId()));

    return Mono.empty();
}
© www.soinside.com 2019 - 2024. All rights reserved.