目前我面临 Spring IntegrationFlows 和 channel() 方法的一些问题。 我不确定我错过了什么,或者框架中是否存在错误。
这是流程:
@Configuration
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class ImportFlow {
ConsumerFactory<String, String> kafkaConsumerFactory;
ServiceProperties serviceProperties;
DebugHandler debugHandler;
DebugSecondHandler secondHandler;
TombstoneHandler tombstoneHandler;
@Bean
public StandardIntegrationFlow startKafkaInbound() {
return IntegrationFlow.from(Kafka
.messageDrivenChannelAdapter(
kafkaConsumerFactory,
ListenerMode.record,
serviceProperties.getImportTopic().getName())
)
.channel(Objects.requireNonNull(routeKafkaMessage().getInputChannel()))
.get();
}
@Bean
public IntegrationFlow routeKafkaMessage() {
return IntegrationFlow.from("routeKafkaMessage.input")
.route(Message.class,
p -> {
if (p.getPayload().equals(KafkaNull.INSTANCE)) {
return processTombstoneFlow().getInputChannel();
}
return someSplittingFlow().getInputChannel();
})
.get();
}
@Bean
IntegrationFlow someSplittingFlow() {
return IntegrationFlow.from("someSplittingFlow.input")
.handle(debugHandler)
.channel(sendToFlow().getInputChannel())
.get();
}
@Bean
IntegrationFlow processTombstoneFlow() {
return IntegrationFlow.from("processTombstoneFlow.input")
.handle(tombstoneHandler)
.channel(sendToFlow().getInputChannel())
.handle(unexpectedHandler) // Whatever you do here besides a direct get() will lead to an error
// .nullChannel()
.handle(m -> {})
.get();
}
@Bean
IntegrationFlow sendToFlow() {
return IntegrationFlow.from("sendToFlow.input")
.handle(secondHandler)
.handle(m -> {}, e -> e.id("endOfSendToFlow"))
.get();
}
}
如果我向 kafka 发送 100 条带有 Payload 的消息,则 sendToFlow 中的 secondHandler 只会收到 50 条消息。 其他 50 条消息由 unexpectedHandler 接收 不知何故,我期望所有消息都会发送到 sendToFlow。
从日志中可以看出,每秒的消息都发送到 tombstoneFlow:
org.springframework.integration.channel.DirectChannel:在通道“bean”processTombstoneFlow.channel#0'上预发送
反之亦然。如果我发送 100 条带有 null (KafkaNull) 的消息,只有一半到达 sencondHandler。
我期望有两个 DirectChannel 仅连接 someSplittingFlow<->sendToFlow 或 processTombstoneFlow<->sendToFlow
我不明白的是 someSplittingFlow 和 processTombstoneFlow 如何以及为何连接。
我还添加了第三个通道来将一些内容发送到sendToFlow。然后只有 33% 的消息到达了预期的渠道。
我把演示应用程序放在这里
Spring Integration 中默认的
MessageChannel
类型是 DirectChannel
。例如,这是您 IntegrationFlow.from("sendToFlow.input")
的结果。该通道采用循环策略,这意味着订阅的消息处理程序将仅获取下一个订阅者的每条消息。
在您的情况下,
sendToFlow.input
从sendToFlow()
那里获得了一名订阅者,基本上是您的.handle(secondHandler)
。然后你在processTombstoneFlow()
中执行此操作:
.channel(sendToFlow().getInputChannel())
.handle(unexpectedHandler)
这意味着您订阅此
unexpectedHandler
作为同一频道的第二个。因此你的循环行为。
我们需要更多地了解您的逻辑,以确定如何解决这种情况的一些步骤。
请参阅有关该通道的文档中的更多信息:https://docs.spring.io/spring-integration/reference/channel/implementations.html#channel-implementations-directchannel