我实现了一个spring批处理项目,从weblogic Jms队列中读取Jms消息(自定义项目读取器,非消息驱动),然后将Jms消息数据传递给一个项目写入器(chunk = 1),在那里我调用一些API并写入DataBase。
然而,我试图实现并行的Jms处理,在并行的Jms消息中读取,并将它们传递给写入器,而不需要等待之前的进程完成。
我在以前的项目中使用了DefaultMessageListenerContainer,它提供了一个并行消费Jms消息的功能,但是在这个项目中,我必须使用spring batch框架。
我可以如何处理这个问题?
我的代码
@Bean
Step step1() {
return steps.get("step1").<Message, DetectionIncoherenceLiqJmsOut>chunk(1)
.reader(reader()).processor(processor()).writer(writer())
.listener(stepListener()).build();
}
@Bean
Job job(@Qualifier("step1") Step step1) {
return jobs.get("job").start(step1).build();
}
Jms代码 :
@Override
public void initQueueConnection() throws NamingException, JMSException {
Hashtable<String, String> properties = new Hashtable<String, String>();
properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_RECEIVE));
InitialContext vInitialContext = new InitialContext(properties);
QueueConnectionFactory vQueueConnectionFactory = (QueueConnectionFactory) vInitialContext
.lookup(env.getProperty(WebLogicConstant.JMS_FACTORY_RECEIVE));
vQueueConnection = vQueueConnectionFactory.createQueueConnection();
vQueueConnection.start();
vQueueSession = vQueueConnection.createQueueSession(false, 0);
Queue vQueue = (Queue) vInitialContext.lookup(env.getProperty(WebLogicConstant.JMS_QUEUE_RECEIVE));
consumer = vQueueSession.createConsumer(vQueue, "JMSCorrelationID IS NOT NULL");
}
@Override
public Message receiveMessages() throws NamingException, JMSException {
return consumer.receive(20000);
}
项目阅读器 。
@Override
public Message read() throws Exception {
return jmsServiceReceiver.receiveMessages();
}
谢谢!我会感激你的帮助:)
有一个 BatchMessageListenerContainer
在 spring-batch-infrastructure-tests
子项目。
消息监听器容器,适应于通过配置提供的建议拦截消息接收。 要在单个事务中实现消息的批量化,请在建议链中使用TransactionInterceptor和RepeatOperationsInterceptor(在基类中设置或不设置事务管理器)。容器将不再接收单个消息并对其进行处理,而是使用RepeatOperations在同一线程中接收多个消息。与RepeatOperations和事务拦截器一起使用。如果事务拦截器使用XA,那么就使用XA连接工厂,否则就使用TransactionAwareConnectionFactoryProxy将JMS会话与正在进行的事务同步(开启了失败后重复消息的可能性)。在后一种情况下,你将不需要在基类中提供一个事务管理器--它只是在路上,防止JMS会话与数据库事务同步。
也许你可以根据你的用例来调整它。