Spring cloud stream-初始化Kafka绑定程序时的通知

问题描述 投票:1回答:1

我的春季云流应用程序中有一个简单的Kafka生产者。在我的Spring应用程序启动时,我有一个@PostConstruct方法,该方法执行一些协调并尝试将事件发送到Kafka生产者。

问题是,我的卡夫卡生产者尚未准备好,当对帐开始将Enets发送到其中时,出现以下情况:

org.springframework.messaging.MessageDeliveryException:调度程序没有订阅者通道'orderbook-service-1.orderbook'。嵌套的异常是org.springframework.integration.MessageDispatchingException:调度程序没有订阅者,failedMessage = GenericMessage ..在org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)在org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)

在我的应用程序启动过程中,有没有一种方法可以通知Kafka通道已初始化,因此我只能将它发布到rec作业上。

这是我的代码段:

public interface OrderEventChannel {
    String TOPIC_BINDING = "orderbook";
    @Output(TOPIC_BINDING)
    SubscribableChannel outboundEvent();
}

@Configuration
@EnableBinding({OrderEventChannel.class})
@ConditionalOnExpression("${aix.core.stream.outgoing.kafka.enabled:false}")
public class OutgoingKafkaConfiguration {
}

@Service
public class OutgoingOrderKafkaProducer {

    @Autowired
    private OrderEventChannel orderEventChannel;

   public void onOrderEvent( ClientEvent clientEvent ) {

        try {
            Message<KafkaEvent> kafkaMsg = mapToKafkaMessage( clientEvent );
            SubscribableChannel subscribableChannel = orderEventChannel.outboundEvent();
            subscribableChannel.send( kafkaMsg );
        } catch ( RuntimeException rte ) {
            log.error( "Error while publishing Kafka event [{}]", clientEvent, rte );
        }
    }
..
..

}

spring-boot apache-kafka spring-kafka spring-cloud-stream
1个回答
1
投票

@PostConstruct在上下文生命周期中还为时过早,无法开始使用Bean;它们仍在创建,配置和连接在一起。

您可以使用ApplicationListener(或@EventListener)来侦听ApplicationReadyEvent(请确保将偶数的applicationContext与主应用程序上下文进行比较,因为您可能会得到其他事件)。

您还可以实现SmartLifecycle并将代码放入start();将您的bean放置在较晚的Phase中,以便在所有连接完成后启动它。

输出绑定在阶段Integer.MIN_VALUE + 1000中开始,输入绑定在阶段Integer.MAX_VALUE - 1000中开始。

因此,如果您想在消息开始流传之前做一些事情,请在消息之间使用一个阶段(例如0,这是默认值。)>

© www.soinside.com 2019 - 2024. All rights reserved.