Spring Cloud动态消费者创建,消息类型丢失

问题描述 投票:0回答:2

我正在为其动态创建云流和消费者,但由于类型信息丢失,无法正确定义消费者。

在下面的示例中,使用 @Bean 注释定义的消费者工作正常,它接收带有所有标头和有效负载的消息。 定义的消费者动态接收 byte[] 作为输入,因为当它创建时,有关泛型类型的信息会丢失,因为方法 registerSingleton(String beanName, Object singletonObject) 接收对象作为参数。

我尝试手动反序列化消息,但 spring 发送序列化的有效负载,而不是带有有效负载的消息,到动态创建的消费者,所以我无法访问标头。 这意味着我在动态消费者中接收序列化的 SitePayload,而不是序列化的 Message。

关于如何动态创建消费者以便它可以访问消息头有什么建议吗?

public class ConsumersCreator implements BeanFactoryPostProcessor {
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

            for (int i = 0; i < 5; i++) {
                beanFactory.registerSingleton("consumer" + i, createConsumer(i));
            }
        }

    private Consumer<Message<SitePayload>> createConsumer(int id) {
        return message -> log.info("Consumer id: {} message received: {}", id, message);
    }

    @Bean
    public Consumer<Message<SitePayload>> consumer() {
        return message -> log.error("Message received: {}", message);
    }
}

spring-boot spring-cloud-stream spring-context
2个回答
0
投票

我不确定为什么要动态创建消费者。我的意思是我无法设想用例,因为在典型的流应用程序中,必须具有一些用于绑定等的消费者/生产者属性。 .

无论如何,您所质疑的动态机制与 spring-core 相关,而不是与 s-c-function 相关。也就是说,我们确实提供了一种动态注册函数的机制,即通过

FunctionRegistration
类。

这是一个小样本(您可以在我们的测试用例中找到更多示例,特别是对于使用

ResolvableType
的泛型用例)

FunctionRegistry registry = applicationContext.getBean(FunctionRegistry.class)
FunctionRegistration registration = new FunctionRegistration(new MyFunction(), "a")
                    .type(FunctionTypeUtils.functionType(Integer.class, String.class));
registry.register(registration);

您将每个 FunctionRegistration 注册为一个 bean,我们将使用正确的类型信息来发现它。


0
投票
public class DynamicConsumerConfig {

    @Autowired
    FunctionRegistry functionRegistry;

    public void createDynamicConsumer(Consumer<String> consumer, String name) {
        Type type = FunctionTypeUtils.discoverFunctionTypeFromClass(Consumer.class);
        FunctionRegistration<Consumer<String>> registration = new FunctionRegistration<>(consumer, name).type(type);
        functionRegistry.register(registration);
    }
}

我尝试了一下,发现这个好像不行。 springcloud的版本是2021.0.8

© www.soinside.com 2019 - 2024. All rights reserved.