未发送到DLQ主题的消息

问题描述 投票:0回答:1

我想对异常使用DLQ

这里是application.yml主题创建成功,但是我的DLQ主题中没有异常消息

spring:
  cloud:
    stream:
      default:
        consumer:
          useNativeEncoding: true
      kafka:
        binder:
          brokers:
            - localhost:9092
          consumer-properties:
            key.deserializer : org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            schema.registry.url: http://localhost:8081
            specific.avro.reader: true
            enable.auto.commit: true
        bindings:
          resourceInventoryInput:
            consumer:
              autoCommitOffset: true
              autoCommitOnError: true
              enableDlq: true
              dlqName: dead-out
              dlqProducerProperties:
                configuration:
                  key.serializer: org.apache.kafka.common.serialization.StringSerializer
                  value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      bindings:
        resourceInventoryInput:
          binder: kafka
          destination: ${application.messaging.topic}
          content-type: application/*+avro
          group: ${application.messaging.group}
      default-binder: kafka
avro spring-kafka spring-cloud-stream dead-letter
1个回答
0
投票
不要在多个地方问相同的问题;这是浪费您和我们的时间。

我已经回答过你on GitHub

我刚刚使用Boot 2.1.15和Greenwich.SR6(以及Boot 2.2.8 / Hoxton.SR5)测试了Yaml,它工作正常。我所做的唯一更改是更改了绑定名称,以输入并注释掉avro内容。

@SpringBootApplication @EnableBinding(Sink.class) public class Kbgh9181Application { public static void main(String[] args) { SpringApplication.run(Kbgh9181Application.class, args); } @StreamListener(Sink.INPUT) public void listen(String in) { throw new RuntimeException("foo"); } @KafkaListener(id = "kbgh918", topics = "dead-out", properties = "auto.offset.reset:earliest") public void listen(Message<?> in) { System.out.println(in); } }

  
GenericMessage [有效载荷=字节[3],标头= {x-原始偏移量= [B @ 67917b81,x-原始分区== [B @ 467895cd],kafka_timestampType = CREATE_TIME,kafka_receivedMessageKey = null,kafka_receivedTopic =已过期, kafka_offset = 5,x-exception-message = [B @ 51def01e,x-exception-fqcn = [B @ 531d42e5,kafka_consumer = org.apache.kafka.clients.consumer.KafkaConsumer @ 3fbc6674,x-original-topic = [B @ 3d684ab3,x-原始时间戳类型= [B @ 1b101300,kafka_receivedPartitionId = 0,x-原始时间戳== [B @ 222370ed,kafka_receivedTimestamp = 1592402977606,x-exception-stacktrace = [B @ 7e703d1b}]
© www.soinside.com 2019 - 2024. All rights reserved.