我正在为其动态创建云流和消费者,但由于类型信息丢失,无法正确定义消费者。
在下面的示例中,使用 @Bean 注释定义的消费者工作正常,它接收带有所有标头和有效负载的消息。 定义的消费者动态接收 byte[] 作为输入,因为当它创建时,有关泛型类型的信息会丢失,因为方法 registerSingleton(String beanName, Object singletonObject) 接收对象作为参数。
我尝试手动反序列化消息,但 spring 发送序列化的有效负载,而不是带有有效负载的消息,到动态创建的消费者,所以我无法访问标头。 这意味着我在动态消费者中接收序列化的 SitePayload,而不是序列化的 Message。
关于如何动态创建消费者以便它可以访问消息头有什么建议吗?
public class ConsumersCreator implements BeanFactoryPostProcessor {
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
for (int i = 0; i < 5; i++) {
beanFactory.registerSingleton("consumer" + i, createConsumer(i));
}
}
private Consumer<Message<SitePayload>> createConsumer(int id) {
return message -> log.info("Consumer id: {} message received: {}", id, message);
}
@Bean
public Consumer<Message<SitePayload>> consumer() {
return message -> log.error("Message received: {}", message);
}
}
我不确定为什么要动态创建消费者。我的意思是我无法设想用例,因为在典型的流应用程序中,必须具有一些用于绑定等的消费者/生产者属性。 .
无论如何,您所质疑的动态机制与 spring-core 相关,而不是与 s-c-function 相关。也就是说,我们确实提供了一种动态注册函数的机制,即通过
FunctionRegistration
类。
这是一个小样本(您可以在我们的测试用例中找到更多示例,特别是对于使用
ResolvableType
的泛型用例)
FunctionRegistry registry = applicationContext.getBean(FunctionRegistry.class)
FunctionRegistration registration = new FunctionRegistration(new MyFunction(), "a")
.type(FunctionTypeUtils.functionType(Integer.class, String.class));
registry.register(registration);
您将每个 FunctionRegistration 注册为一个 bean,我们将使用正确的类型信息来发现它。
public class DynamicConsumerConfig {
@Autowired
FunctionRegistry functionRegistry;
public void createDynamicConsumer(Consumer<String> consumer, String name) {
Type type = FunctionTypeUtils.discoverFunctionTypeFromClass(Consumer.class);
FunctionRegistration<Consumer<String>> registration = new FunctionRegistration<>(consumer, name).type(type);
functionRegistry.register(registration);
}
}
我尝试了一下,发现这个好像不行。 springcloud的版本是2021.0.8