如何在实例化视图中访问自定义标题?我正在尝试在应用程序中构建一些自定义dlq逻辑,并希望基于标头信息构建重试机制。实际重试由调度程序触发,该调度程序应在实例化视图中查找这些标头信息。
以下是一些代码片段:
创建实体化视图:
@Slf4j
@EnableBinding(DlqBinding.class)
public class DlqRetryService {
@StreamListener
public void readTable(@Input(DlqBinding.DLQ_TOPIC) KTable<String, String> table) {
}
}
public interface DlqBinding {
String DLQ_TOPIC = "dlq";
@Input(DLQ_TOPIC)
KTable<?, ?> dlqInput();
}
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: localhost:29092
configuration:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
dlq:
consumer:
materializedAs: currentDL
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
计划程序:
public void processDL() {
ReadOnlyKeyValueStore<Object, Object> currentDL = interactiveQueryService.getQueryableStore("currentDL", QueryableStoreTypes.keyValueStore());
KeyValueIterator<Object, Object> all = currentDL.all();
while (all.hasNext()) {
KeyValue<Object, Object> next = all.next();
log.info("Found Entry in currentDL: {}", next);
// some retry logic would be here
}
}```
我认为不可能使用交互式查询从物化视图中以这种方式访问标头。
不清楚您要重试什么。您是否要重新处理记录?您可以使用KStream
和转换器/处理器API来封装这种逻辑。这是这种模式的蓝图。请记住,这是访问标头的一般方法。您的应用程序可能需要根据您的特定用例进行调整。
@StreamListener
public void processStream(@Input(DlqBinding.DLQ_TOPIC) KStream<String, String> stream) {
stream.process(() -> new Processor() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, Object value) {
final Headers headers = this.context.headers();
final Iterable<Header> headerXyz = headers
.headers("HEADER_KEY");
// iterate on the header information returned and perform
// your application specific logic here.
}
@Override
public void close() {
}
});
}