Spring Cloud Stream功能手册确认 - KafkaHeaders.ACKNOWLEDGMENT不可用。

问题描述 投票:0回答:1

我正在使用Spring Cloud Webflux和Spring Cloud Streams Functional Interfaces来处理我的kafka处理。

如果我按顺序进行处理,如果我杀了应用程序,它就会返回处理的消息,这与预期的一样,因为没有消息丢失。然而,如果我尝试做并行处理,它似乎在向Kafka确认,这是可以理解的,因为它现在是一个单独的线程,因此想转为手动确认。

我的代码。

  1. application.yml (相关部分)
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          autoAddPartitions: true
          minPartitionCount: 2         
      bindings:
          receiver-in-0:
               binder: kafka
              destination: topic-1
              content-type: text/plain;charset=UTF-8
              group: input-group-1
              consumer:
                  autoCommitOffset: false
spring.cloud.stream.function.definition: receiver
  1. 收件人代码
public Consumer<Flux<Message<String>>> receiver() throws IOException {
        return (sink -> {
            sink
            .onBackpressureBuffer()
            .parallel(4)
            .runOn(Schedulers.parallel())
            .subscribe((record)->{  
                Flux<Action> executor = new 
                           //Internal code which does transformation and provides a flux for execution (names changed)
 IncomingMessage().process(record);

                if(executor != null) {
                    Disposable disposable=null;
                    disposable= executor.subscribe(
                            (action)->{

                                try {
                                   //Process execute does the processing on the modified data (names changed)
                                    Process.execute(action);
                                    Acknowledgment acknowledgment = record.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
                                    if(acknowledgment !=null) {
                                        acknowledgment.acknowledge();
                                    }
                                }
                                catch(Exception e) {

                                log.fatal(e.getMsg());
                                }
                            },
                            (e)->{

                                log.fatal(e.getMsg());
                                }
                            });
                    if(disposable != null) {
                        disposable.dispose();
                    }

                }

            });

        });
    }

这里的线路 record.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); 总是给出null,所以我假设 autoCommitOffset: false 是不工作的,我试着把下面的配置也放在绑定部分,但没有用。

 receiver-in-0:
               binder: kafka
              destination: topic-1
              content-type: text/plain;charset=UTF-8
              group: input-group-1
              autoCommitOffset: false

我的要求是,如果我杀死应用程序,即使在并行场景中,它也应该继续读取第一个未确认消息的消息。

kafka-consumer-api spring-webflux spring-cloud-stream
1个回答
0
投票

没有得到确认头的问题是由于标签的位置不正确。因为这纯粹是一个kafka binder属性。添加了以下属性

spring:
  cloud:
    stream:
      kafka:
        bindings:
           receiver-in-0:
              consumer:
                   autoCommitOffset: false

通道名称应该与函数调用中的通道名称相同,或者可以设置默认值。

 spring:
    cloud:
      stream:
        kafka:
           default:
            consumer:
               autoCommitOffset: false

然而并行处理传入的通量可能不是一个好主意,因为消息可能会被丢弃,因为一些后来的消息可以得到确认。这将需要更多的逻辑来确认,而不是仅仅设置参数确认。

© www.soinside.com 2019 - 2024. All rights reserved.