如何在kafka实例化视图中访问标头信息?

问题描述 投票:0回答:1

如何在实例化视图中访问自定义标题?我正在尝试在应用程序中构建一些自定义dlq逻辑,并希望基于标头信息构建重试机制。实际重试由调度程序触发,该调度程序应在实例化视图中查找这些标头信息。

以下是一些代码片段:

创建实体化视图:

@Slf4j
@EnableBinding(DlqBinding.class)
public class DlqRetryService {

    @StreamListener
    public void readTable(@Input(DlqBinding.DLQ_TOPIC) KTable<String, String> table) {
    }
}

public interface DlqBinding {

    String DLQ_TOPIC = "dlq";

    @Input(DLQ_TOPIC)
    KTable<?, ?> dlqInput();
}

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: localhost:29092
            configuration:
              default:
                key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          bindings:
            dlq:
              consumer:
                materializedAs: currentDL
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde

计划程序:

    public void processDL() {
        ReadOnlyKeyValueStore<Object, Object> currentDL = interactiveQueryService.getQueryableStore("currentDL", QueryableStoreTypes.keyValueStore());

        KeyValueIterator<Object, Object> all = currentDL.all();

        while (all.hasNext()) {
            KeyValue<Object, Object> next = all.next();
            log.info("Found Entry in currentDL: {}", next);
            // some retry logic would be here
        }
    }```
spring spring-cloud-stream spring-cloud-stream-binder-kafka
1个回答
0
投票

我认为不可能使用交互式查询从物化视图中以这种方式访问​​标头。

不清楚您要重试什么。您是否要重新处理记录?您可以使用KStream和转换器/处理器API来封装这种逻辑。这是这种模式的蓝图。请记住,这是访问标头的一般方法。您的应用程序可能需要根据您的特定用例进行调整。

@StreamListener
public void processStream(@Input(DlqBinding.DLQ_TOPIC) KStream<String, String> stream) {

stream.process(() -> new Processor() {

            ProcessorContext context;

            @Override
            public void init(ProcessorContext context) {
                this.context = context;
            }

            @Override
            public void process(Object key, Object value) {
                final Headers headers = this.context.headers();
                final Iterable<Header> headerXyz = headers
                        .headers("HEADER_KEY");
                // iterate on the header information returned and perform 
                // your application specific logic here.
            }

            @Override
            public void close() {

            }
        });

}
© www.soinside.com 2019 - 2024. All rights reserved.