我仍在与 Spring Integration 作斗争 - 这是我的场景:
将有多个 Web 服务将消息全部传送到该队列中,我需要确保它们真正按照接收顺序进行处理。
我需要将 Spring Integration 的哪些部分连接在一起?
我无法帮助您使用 Spring Integration,但也许您需要重新考虑一下您的架构。在 ESB 系统中,当您知道消息的处理将花费相当长的时间或者不确定远程端是否已准备好(另一个原因是桥接不兼容的组件)时,您通常会将消息放入队列中。当您将消息添加到队列中时,您立即返回给请求者,指示消息已收到,但不提供操作结果。然后,请求者需要轮询结果,或者您也可以提供某种“推送”功能。
因此,如果处理队列中的消息需要花费大量时间,我建议修改您的架构。 Web 客户端等待长时间回复的情况并不常见,而且许多请求也可能会超时。
另一方面,如果消息的处理快速且可靠,则不需要使用队列通道。让所有消息与中心组件(Java EE 会话 Bean、Spring Bean、Web 服务)进行通信,并自行实现队列机制。已经有一些答案涵盖了如何做到这一点。
基于 QueueChannel 的 Javadoc,这是我的尝试。这并不涉及 Web 服务配置,只是涉及 Web 服务后端实现中的代码。
这是将某些内容添加到队列(您的网络服务)中的代码。
public class TheWebService {
// Could also use QueueChannel, or PollableChannel here instead
// just picked the most general one
private org.springframework.integration.channel.MessageChannel queue;
public void yourWebServiceMethod(SomeArg arg) {
SomeObjectToPassThatExtendsMessage passed = someInitialProcessing(arg);
queue.send(passed);
}
}
这是您的接收器/处理器/出队类中的代码
public class TheProcessor {
// Could also use QueueChannel here instead
// just picked the most general one
private org.springframework.integration.channel.PollableChannel queue;
// This method needs to be setup to be called by a separate thread.
// See http://static.springframework.org/spring/docs/2.5.x/api/org/springframework/scheduling/package-summary.html
// and it's sub-packages.
public void someProcessingPoller() {
SomeObjectToPassThatExtendsMessage passed = queue.receive();
// Do some processing with the passed object.
}
}
Spring 配置看起来像这样
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans">
<bean id="webService" class="mypackage.TheWebService">
<property name="queue" ref="queue" />
</bean>
<bean id="processor" class="mypackage.TheProcessor ">
<property name="queue" ref="queue" />
</bean>
<bean id="queue" class="org.springframework.integration.channel.QueueChannel"/>
</beans>
请注意 Spring Integration,但 Java 5 有许多可以处理 FIFO 的 BlockingQueue。
您应该查看 Spring Integration 中的 http(对于 REST)或 ws(对于 POX/SOAP)“入站网关”元素。任何一个都可以通过“请求通道”属性连接到共享的、队列支持的通道(可以在后台处理通过同一网关返回回复的路由)。我建议首先浏览示例。这个博客应该可以帮助您启动和运行:http://blog.springsource.com/2010/09/29/new-spring-integration-samples/
希望有帮助。 -马克
问题不在于弹簧。我认为您将需要一个队列,其中的元素包含请求并提供响应。但响应需要阻塞,直到元素被出列并处理。所以队列元素看起来像:
public class BlockingPair {
private final RequestBodyType request;
private ResponseBodyType response;
public BlockingPair(RequestBodyType request) {
this.request = request;
}
public RequestBodyType getRequest() {
return request;
}
public ResponseBodyType getResponse() {
while (response == null) {
Thread.currentThread().sleep(10);
}
return response;
}
public void setResponse(ResponseBodyType response) {
this.response = response;
}
}
Web 服务排队使用其请求正文创建 BlockingPair。然后将 BlockingPair 元素推入队列。然后,它创建响应并从 BlockingPair 获取响应正文,但会阻塞。
消费者将一个BlockingPair放入队列并设置响应主体。从那里,网络服务继续写入响应。
您需要三个 bean:webservice、阻塞队列和消费者。 Web 服务和消费者都需要队列作为 bean 属性。
队列和消费者 bean 需要在应用程序上下文中进行规划(由
ContextLoaderListener
初始化)。队列需要一个 bean id 才能被 Web 服务引用(它有自己的上下文,但应用程序上下文作为父级,因此可以引用队列引用):
部分
web.xml
:
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:/WEB-INF/applicationContext.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<servlet>
<servlet-name>service</servlet-name>
<servlet-class>org.springframework.ws.transport.http.MessageDispatcherServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>service</servlet-name>
<url-pattern>/*</url-pattern>
</servlet-mapping>
applicationContext.xml
包含两个豆子:
<bean id="queue" class="java.util.concurrent.LinkedBlockingQueue"/>
<bean id="consumer" class="...">
<property name="queue" ref="queue"/>
</bean>
Web服务有自己的上下文定义,这里
service-servlet.xml
:
<bean id="endpoint" class="org.springframework.ws.server.endpoint....PayloadEndpoint">
<property name="queue" ref="queue"/>
</bean>
有关定义 spring ws 端点的更多信息,请参阅 spring 教程。