Spring集成java dsl错误,发现参数类型不明确

问题描述 投票:0回答:1

我是spring集成dsl的新手。

作为学习的一部分,我将一种XML转换为Java DSL配置

这里是xml配置的链接。

https://dzone.com/articles/spring-integration-building

这是我的Java DSL配置。

package study.pattern.integration.lab9.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.MessageHandler;

import study.pattern.integration.lab9.domain.ItemType;
import study.pattern.integration.lab9.domain.Order;
import study.pattern.integration.lab9.domain.OrderItem;
import study.pattern.integration.lab9.service.OrderDelivery;
import study.pattern.integration.lab9.service.OrderItemsProcessor;

@EnableIntegration
@Configuration
@IntegrationComponentScan
public class IntegrationConfiguration {

    @Autowired
    OrderItemsProcessor processor;

    @Autowired
    OrderDelivery orderDelivery;

    @Bean
    @Qualifier("orderChannel")
    public DirectChannel orderChannel() {
        return MessageChannels.direct().get();
    }   

    @Bean
    @Qualifier("orderItemsChannel")
    public DirectChannel orderItemsChannel() {
        return MessageChannels.direct().get();
    }


    /**
     * 
     * 
     * QueueChannel: Implements PollableChannel. 
     * There’s one endpoint connected to the channel, no subscribers. This communication is asynchronous; 
     * the receiver will retrieve the message through a different thread. How it works:
        The producer sends the message to the channel.
        The channel queues the message.
        The consumer actively retrieves the message (active receiver).
     * @return
     */


    @Bean
    @Qualifier("musicItemsChannel")
    public QueueChannel musicItemsChannel() {
        return MessageChannels.queue().get();
    }

    @Bean
    @Qualifier("softwareItemsChannel")
    public QueueChannel softwareItemsChannel() {
        return MessageChannels.queue().get();
    }

    @Bean
    @Qualifier("booksItemChannel")
    public QueueChannel booksItemsChannel() {
        return MessageChannels.queue().get();
    }

    @Bean
    @Qualifier("orderItemsProcessed")
    public DirectChannel orderItemsProcessedChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    @Qualifier("orderDelivered")
    public DirectChannel orderDeliveredChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    @ServiceActivator(inputChannel = "myLogChannel")
    public MessageHandler logger() {
         LoggingHandler loggingHandler =  new LoggingHandler(LoggingHandler.Level.INFO.name());
         loggingHandler.setLoggerName("logging");
         return loggingHandler;
    }


    @Bean
    IntegrationFlow processOrderFlow() {
        return IntegrationFlows
                .from(orderChannel())
                    .split(Order.class , a -> a.getOrderItems())
                    .channel(orderItemsChannel())
                    .wireTap(f -> f.handle(logger()))
                    .route(OrderItem.class,
                            o -> o.getType().name(),
                            type -> type.channelMapping(ItemType.BOOK.name(),booksItemsChannel())
                            .channelMapping(ItemType.MUSIC_CD.name(), musicItemsChannel())
                            .channelMapping(ItemType.SOFTWARE.name(), softwareItemsChannel())
                            )
        .get();
    }

    @Bean   
    IntegrationFlow processBooksItemChannel() {
        return IntegrationFlows.from(booksItemsChannel())
            .handle(processor,"processBooksOrderItem",spec -> spec.poller(Pollers.fixedDelay(100l)))
            .channel(orderItemsProcessedChannel())
            .wireTap(f -> f.handle(logger()))
            .log()
            .get();

    }

    @Bean
    IntegrationFlow processMusicItemChannel() {
        return IntegrationFlows.from(booksItemsChannel())
            .handle(processor,"processMusicOrderItem",spec -> spec.poller(Pollers.fixedDelay(100l)))
            .channel(orderItemsProcessedChannel())
            .wireTap(f -> f.handle(logger()))
            .log()
            .get();

    }

    @Bean
    IntegrationFlow processSoftwareItemChannel() {
        return IntegrationFlows.from(booksItemsChannel())
            .handle(processor, "processSoftware", spec -> spec.poller(Pollers.fixedDelay(100l)))
            .channel(orderItemsProcessedChannel())
            .wireTap(f -> f.handle(logger()))
            .log()
            .get();
    }

    @Bean
    IntegrationFlow aggreateAllProcessedOrderItems() {
        return IntegrationFlows.from(orderItemsProcessedChannel())
                .aggregate(spec -> spec.processor(orderDelivery, "delivery").get()) 
                .handle(orderDeliveredChannel())
                .wireTap(f -> f.handle(logger()))
                .log()
                .get();
    }
}

在OrderDelivery类方法下面。

public class OrderDelivery {

    public Order delivery(GenericMessage<List<OrderItem>> orderItems) {
        log.info("Delivery of the order /......");
        final Order order = new Order();
        orderItems.getPayload().stream().forEach(orderItem -> order.addItem(orderItem));
        return order;
    }
}

当我运行我的代码时,出现以下异常。

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'aggreateAllProcessedOrderItems' defined in class path resource [study/pattern/integration/lab9/config/IntegrationConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.integration.dsl.IntegrationFlow]: Factory method 'aggreateAllProcessedOrderItems' threw exception; nested exception is java.lang.IllegalArgumentException: Found ambiguous parameter type [interface org.springframework.messaging.MessageHandler] for method match: [public void org.springframework.integration.channel.AbstractMessageChannel.setInterceptors(java.util.List), public void org.springframework.integration.context.IntegrationObjectSupport.setApplicationContext(org.springframework.context.ApplicationContext) throws org.springframework.beans.BeansException, public static java.util.UUID org.springframework.integration.context.IntegrationObjectSupport.generateId(), public void org.springframework.integration.channel.AbstractMessageChannel.configureMetrics(org.springframework.integration.support.management.AbstractMessageChannelMetrics), public void org.springframework.integration.context.IntegrationObjectSupport.setChannelResolver(org.springframework.messaging.core.DestinationResolver), public final void org.springframework.integration.context.IntegrationObjectSupport.setPrimaryExpression(org.springframework.expression.Expression), public void org.springframework.integration.channel.AbstractMessageChannel.setDatatypes(java.lang.Class[]), public org.springframework.messaging.support.ChannelInterceptor org.springframework.integration.channel.AbstractMessageChannel.removeInterceptor(int), public void org.springframework.integration.channel.AbstractMessageChannel.registerMetricsCaptor(org.springframework.integration.support.management.metrics.MetricsCaptor), public boolean org.springframework.integration.channel.AbstractMessageChannel.removeInterceptor(org.springframework.messaging.support.ChannelInterceptor), public void org.springframework.integration.context.IntegrationObjectSupport.setMessageBuilderFactory(org.springframework.integration.support.MessageBuilderFactory), public boolean org.springframework.integration.channel.AbstractSubscribableChannel.subscribe(org.springframework.messaging.MessageHandler), public void org.springframework.integration.context.IntegrationObjectSupport.setTaskScheduler(org.springframework.scheduling.TaskScheduler), public void org.springframework.integration.context.IntegrationObjectSupport.setComponentName(java.lang.String), public void org.springframework.integration.context.IntegrationObjectSupport.setBeanFactory(org.springframework.beans.factory.BeanFactory), public void org.springframework.integration.channel.AbstractMessageChannel.setShouldTrack(boolean), public void org.springframework.integration.channel.AbstractMessageChannel.setMessageConverter(org.springframework.messaging.converter.MessageConverter)]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:645) ~[spring-beans-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:475) ~[spring-beans-5.2.1.RELEASE.jar:5.2.1.RELEASE]

我尝试调试,但没有发现任何线索导致此问题的代码错误。 (我尝试使用Message和GenericMessage接口包装传递方法参数,仍然是相同的问题)

有人可以帮助我解决此问题。



spring spring-boot spring-integration spring-integration-dsl
1个回答
0
投票

您的错误在这里:

handle(orderDeliveredChannel())

该方法在哪里:

@Bean
@Qualifier("orderDelivered")
public DirectChannel orderDeliveredChannel() {
    return MessageChannels.direct().get();
}

哪个是MessageChannel bean。

handle()与某些服务方法调用或某些MessageHandler实现有关。

[不确定您为什么尝试将服务呼叫与消息通道混在一起。请修改有关该频道使用情况的逻辑。

© www.soinside.com 2019 - 2024. All rights reserved.