将 JdbcMessageStore 添加到 Spring Kafka 聚合器

问题描述 投票:0回答:1

如何将

JdbcMessageStore
设置为聚合器,以便它使用 RDBMS 而不是内存中的消息存储? 目前
AggregatorAnnotationPostProcessor
直接通过框架将
new SimpleMessageStore()
设置为
AggregatingMessageHandler

以下是配置,无需

JdbcMessageStore
即可按预期工作。

@Bean
public ConsumerFactory<?,?> kafkaConsumerFactory(KafkaProperties properties) {
    ConsumerProperties props = properties.buildConsumerProperties();
    return DefaultKafkaConsumerFactory<>(props);
}

@Bean
@InboundChannelAdapter(channel = "fromChannel", poller = @Poller(fixedDelay = "1000"))
public KafkaMessageSource<String, MyPojo> kafkaMessageSource(ConsumerFactory<String, MyPojo> cf) {
    ConsumerProperties props = new ConsumerProperties("topic.in");
    return new KafkaMessageSource<>(cf, props);
}

@Bean
public MessageChannel fromChannel() {
    return new DirectChannel();
}

@Aggregator(inputChannel = "fromChannel", outputChannel = "toChannel")
public List<MyPojo> aggregate(List<MyPojo> list) {
    //apply logic
    return newList;
}

@CorrelationStrategy
public Object correate(Message<MyPojo> message) {
    //apply logic
    //return correlationId; //String
}

@ReleaseStrategy
public boolean checkRelease(Message<MyPojo> message) {
    //apply logic
    //return canRelease; //boolean
}

@Bean
public ProducerFactory<?,?> kafkaProducerFactory(KafkaProperties properties) {
    ConsumerProperties props = properties.buildProducerProperties();
    return DefaultKafkaProducerFactory<>(props);
}

@Bean
@ServiceActivator(inputChannel= "toChannel")
public MessageHandler handler(KafkaTemplate<String, List<MyPojo>> kafkaTemplate) {
    KafkaProducerMessageHandler<String, List<MyPojo>> handler = new KafkaProducerMessageHandler<>(kafkaTemplate);
    handler.setTopicExpression(new LiteralExpression("topic-out"));
    return handler;
}

@Bean
public MessageChannel toChannel() {
    return new DirectChannel();
}

@Bean
public MessageGroupStore messageGroupStore(DataSource dataSource) {
    return new JdbcMessageStore(dataSource);
}
spring-boot apache-kafka spring-integration spring-kafka spring-cloud-stream
1个回答
0
投票

为此,我们建议通过

AggregatorFactoryBean
使用更高级的配置:https://docs.spring.io/spring-integration/reference/aggregator.html#aggregator-annotations

    @Bean
    @ServiceActivator(inputChannel = "fromChannel")
    AggregatorFactoryBean aggregatorFactoryBean(MessageGroupStore messageGroupStore, MessageChannel toChannel) {
        AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
        aggregatorFactoryBean.setMessageStore(messageGroupStore);
        aggregatorFactoryBean.setProcessorBean(...);
        aggregatorFactoryBean.setCorrelationStrategy(...);
        aggregatorFactoryBean.setReleaseStrategy(...);
        aggregatorFactoryBean.setOutputChannel(toChannel);
        return aggregatorFactoryBean;
    }
© www.soinside.com 2019 - 2024. All rights reserved.