Spring Integration Channel() 将消息分发给意外的订阅者

问题描述 投票:0回答:1

目前我面临 Spring IntegrationFlows 和 channel() 方法的一些问题。 我不确定我错过了什么,或者框架中是否存在错误。

这是流程:

@Configuration
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class ImportFlow {


    ConsumerFactory<String, String> kafkaConsumerFactory;
    ServiceProperties serviceProperties;

    DebugHandler debugHandler;

    DebugSecondHandler secondHandler;

    TombstoneHandler tombstoneHandler;

    @Bean
    public StandardIntegrationFlow startKafkaInbound() {
        return IntegrationFlow.from(Kafka
                .messageDrivenChannelAdapter(
                    kafkaConsumerFactory,
                    ListenerMode.record,
                    serviceProperties.getImportTopic().getName())
            )
            .channel(Objects.requireNonNull(routeKafkaMessage().getInputChannel()))
            .get();
    }

    @Bean
    public IntegrationFlow routeKafkaMessage() {
        return IntegrationFlow.from("routeKafkaMessage.input")
            .route(Message.class,
                p -> {
                    if (p.getPayload().equals(KafkaNull.INSTANCE)) {
                        return processTombstoneFlow().getInputChannel();
                    }
                    return someSplittingFlow().getInputChannel();
                })
            .get();
    }

    @Bean
    IntegrationFlow someSplittingFlow() {
        return IntegrationFlow.from("someSplittingFlow.input")
            .handle(debugHandler)
            .channel(sendToFlow().getInputChannel())
            .get();
    }


    @Bean
    IntegrationFlow processTombstoneFlow() {
        return IntegrationFlow.from("processTombstoneFlow.input")
            .handle(tombstoneHandler)
            .channel(sendToFlow().getInputChannel())
        .handle(unexpectedHandler) // Whatever you do here besides a direct get() will lead to an error 
            // .nullChannel()
            .handle(m -> {})
            .get();
    }


    @Bean
    IntegrationFlow sendToFlow() {
        return IntegrationFlow.from("sendToFlow.input")
            .handle(secondHandler)
            .handle(m -> {}, e -> e.id("endOfSendToFlow"))
            .get();
    }

}

如果我向 kafka 发送 100 条带有 Payload 的消息,则 sendToFlow 中的 secondHandler 只会收到 50 条消息。 其他 50 条消息由 unexpectedHandler 接收 不知何故,我期望所有消息都会发送到 sendToFlow。

从日志中可以看出,每秒的消息都发送到 tombstoneFlow:

org.springframework.integration.channel.DirectChannel:在通道“bean”processTombstoneFlow.channel#0'上预发送

反之亦然。如果我发送 100 条带有 null (KafkaNull) 的消息,只有一半到达 sencondHandler

我期望有两个 DirectChannel 仅连接 someSplittingFlow<->sendToFlowprocessTombstoneFlow<->sendToFlow

我不明白的是 someSplittingFlowprocessTombstoneFlow 如何以及为何连接。

我还添加了第三个通道来将一些内容发送到sendToFlow。然后只有 33% 的消息到达了预期的渠道。

我把演示应用程序放在这里

spring-integration spring-integration-dsl
1个回答
0
投票

Spring Integration 中默认的

MessageChannel
类型是
DirectChannel
。例如,这是您
IntegrationFlow.from("sendToFlow.input")
的结果。该通道采用循环策略,这意味着订阅的消息处理程序将仅获取下一个订阅者的每条消息。

在您的情况下,

sendToFlow.input
sendToFlow()
那里获得了一名订阅者,基本上是您的
.handle(secondHandler)
。然后你在
processTombstoneFlow()
中执行此操作:

    .channel(sendToFlow().getInputChannel())
    .handle(unexpectedHandler)

这意味着您订阅此

unexpectedHandler
作为同一频道的第二个。因此你的循环行为。

我们需要更多地了解您的逻辑,以确定如何解决这种情况的一些步骤。

请参阅有关该通道的文档中的更多信息:https://docs.spring.io/spring-integration/reference/channel/implementations.html#channel-implementations-directchannel

© www.soinside.com 2019 - 2024. All rights reserved.