在函数中使用SendToDlqAndContinue而不使用处理器spring-cloud-stream-binder-kafka-streams

问题描述 投票:0回答:1

当我在函数中处理的消息不遵循某些逻辑时,我正在尝试向 DLQ 发送消息。

对于某些上下文,我在 application.yaml 中有此配置 我的函数如下所示:

function:
  definition: filterConsumption|EnrichConsumption;
#Functions and topic binding
stream:
  bindings:
    filterConsumptionEnrichConsumption-in-0:
      destination: input
    filterConsumptionEnrichConsumption-out-0:
      destination: output
  kafka:
    streams:
      bindings:
        filterConsumptionEnrichConsumption-in-0:
          consumer:
            enable-dlq: true
            dlqName: input_dlq
            application-id: input-application-id
        filterConsumptionEnrichConsumption-out-1:
          consumer:
            enable-dlq: false
            application-id: output-application-id
      binder:
        #Kafka consumer config
        replicationFactor: ${KAFKA_STREAM_REPLICATION_FACTOR:1}
        brokers: ${BOOTSTRAP_SERVERS_CONFIG:localhost:9092}
        deserialization-exception-handler: sendToDlq

我的函数如下所示:

@Bean("EnrichConsumption")
public Function<KStream<String, ConsumptionSchema>, KStream<String, ConsumptionSchema>> EnrichConsumption() {

    return input ->
            input.filter((key, consumptions) -> !getSomething(consumptions).orElse("").isBlank())
                    .merge(
                            //filter consumptions having a tradingName
                            input.filter((key, consumptions) -> getSomething(consumptions).orElse("").isBlank())
                                    //enrich consumptions with missing tradingName
                                    .mapValues(this::setSomething)
                    );

}

在“setSomething”过程中,由于逻辑规则,可能会出现一些异常。

我尝试了两件事: 第一次使用 StreamBridge,但我不断收到以下错误:

Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

我尝试这样配置streamBridge:

private final StreamBridge streamBridge;

@Bean("EnrichConsumption")
public Function<KStream<String, ConsumptionSchema>, KStream<String, ConsumptionSchema>> EnrichConsumption() {
    return input ->
        input.filter((key, consumptions) ->
                !getSomething(consumptions).orElse("").isBlank())
            .merge(
                //filter consumptions having a tradingName
                input.filter((key, consumptions) ->
                        getSomething(consumptions).orElse("").isBlank())
                    //enrich consumptions with missing tradingName
                    .mapValues((ConsumptionSchema value) -> {
                        try {
                            return setSomething(value);
                        } catch (DlqException e) {
                            Message<ConsumptionSchema> mess = MessageBuilder.withPayload(value).build();
                            streamBridge.send("filterConsumptionEnrichConsumption-in-1", mess);
                            return null;
                        }
                    })

            );
}

我还尝试使用 SendToDlqAndContinue,使用这样的处理器:

@Autowired
private final SendToDlqAndContinue dlqHandler;

@Bean("EnrichConsumption")
public Function<KStream<String, ConsumptionSchema>, KStream<String, ConsumptionSchema>> EnrichConsumption() {

    return input ->
        input.process(() -> new Processor<String, ConsumptionSchema, String, ConsumptionSchema>() {

            ProcessorContextImpl context;

            @Override
            public void init(ProcessorContext context) {
                this.context = (ProcessorContextImpl) context;
            }

            @Override
            public void process(Record<String, ConsumptionSchema> processInput) {
                input.filter((key, consumptions) ->
                        !getSomething(consumptions).orElse("").isBlank())
                    .merge(
                        //filter consumptions having a tradingName
                        input.filter((key, consumptions) ->
                                getSomething(consumptions).orElse("").isBlank())
                            //enrich consumptions with missing tradingName
                            .mapValues((ConsumptionSchema value) -> {
                                try {
                                    return setSomething(value);
                                } catch (DlqException e) {
                                    log.error("Exception during handling of consumption message : {}, message : {}",
                                        processInput.key(), e.getMessage());
                                    dlqHandler.sendToDlq(
                                        new ConsumerRecord<>(
                                            context.topic(),
                                            context.partition(),
                                            context.offset(),
                                            processInput.key(),
                                            processInput.value()), e);
                                    return null;
                                }
                            })

                    );
            }

在这种情况下,我不明白为什么,似乎没有调用 process 方法。

任何人都可以帮助我使用 SendToDlqAndContinue (首选解决方案)或 StreamBridge 使其正常工作?

编辑:

使用与第一部分相同的 application.yaml,我尝试了 DltAwareProcessor :

@Configuration
@Data
@Slf4j
public class BillableConsumptionFilterStreaming {
@Bean("filterConsumption")
public Function<KStream<String, ConsumptionSchema>,
    KStream<String, ConsumptionSchema>> filterConsumption(DltSenderContext dltSenderContext) {
    return input ->
        input.process(() ->
            new DltAwareProcessor<>(
                (BiFunction<String, ConsumptionSchema, KeyValue<String, ConsumptionSchema>>) (s, c) -> {
                throw new RuntimeException("Exception that won't kill stream");
            }, "input_dlq", dltSenderContext));
}

使用断点,正确调用 DltAwareProcessor,直到这一行:streamBridge.send(this.dltDestination, r.value());

没有抛出异常或任何异常,但我收到以下日志:

Using kafka topic for outbound: input_dlq
Node -1 disconnected.
Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

在我的应用程序中,BOOTSTRAP_SERVERS_CONFIG被我们的kafka的地址覆盖,并且当没有异常时,消息被正确路由到输出主题。所以也许我在 application.yaml 中缺少一些配置来为 StreamBridge 配置代理。

spring-kafka spring-cloud-stream spring-cloud-stream-binder-kafka dlq
1个回答
0
投票

SendToDlqAndContinue
是为了反序列化异常处理目的而显式构建的。您不能像您尝试的那样将其用于运行时错误处理。我们最近(在
4.1.0-SNAPSHOT
行中)推出了一项新功能,可能有助于此用例。请参阅以下问题以了解更多详细信息。 https://github.com/spring-cloud/spring-cloud-stream/issues/2779 https://github.com/spring-cloud/spring-cloud-stream/issues/2802

以下是此功能的文档:https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-streams-binder/error-handling.html#runtime-error-handling

看看这是否适合您的用例。如果您发现此功能有任何改进空间,请对上面的问题 (2802) 发表评论,我们仍然可以做到这一点,因为我们将在本月晚些时候发布

4.1.0-RC1
版本。

© www.soinside.com 2019 - 2024. All rights reserved.