用JMS进行Spring Batch并行处理

问题描述 投票:0回答:1

我实现了一个spring批处理项目,从weblogic Jms队列中读取Jms消息(自定义项目读取器,非消息驱动),然后将Jms消息数据传递给一个项目写入器(chunk = 1),在那里我调用一些API并写入DataBase。

然而,我试图实现并行的Jms处理,在并行的Jms消息中读取,并将它们传递给写入器,而不需要等待之前的进程完成。

我在以前的项目中使用了DefaultMessageListenerContainer,它提供了一个并行消费Jms消息的功能,但是在这个项目中,我必须使用spring batch框架。

  • 我尝试使用最简单的解决方案(多线程步骤),但它没有工作,JmsException:"invalid blocking receive when anotherreceive is in progress",这意味着可能我的读者是满的。
  • 我想过使用远程分区,但是我必须读取所有的消息,并把数据放到步骤执行上下文中,然后再调用从属步骤,如果处理大量的消息,这并不是真正的效率。
  • 我看了一下远程分块,我知道它是通过队列通道来传递数据的,但我似乎找不到从Jms中读取消息并把消息放到本地队列中供从属工作者使用的效用。

我可以如何处理这个问题?

我的代码

 @Bean
    Step step1() {
        return steps.get("step1").<Message, DetectionIncoherenceLiqJmsOut>chunk(1)
                .reader(reader()).processor(processor()).writer(writer())
                .listener(stepListener()).build();
    }

    @Bean
    Job job(@Qualifier("step1") Step step1) {
        return jobs.get("job").start(step1).build();
    }

Jms代码 :

  @Override
    public void initQueueConnection() throws NamingException, JMSException {

        Hashtable<String, String> properties = new Hashtable<String, String>();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
        properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_RECEIVE));
        InitialContext vInitialContext = new InitialContext(properties);

        QueueConnectionFactory vQueueConnectionFactory = (QueueConnectionFactory) vInitialContext
                .lookup(env.getProperty(WebLogicConstant.JMS_FACTORY_RECEIVE));

        vQueueConnection = vQueueConnectionFactory.createQueueConnection();
        vQueueConnection.start();

        vQueueSession = vQueueConnection.createQueueSession(false, 0);
        Queue vQueue = (Queue) vInitialContext.lookup(env.getProperty(WebLogicConstant.JMS_QUEUE_RECEIVE));
        consumer = vQueueSession.createConsumer(vQueue, "JMSCorrelationID IS NOT NULL");

    }

    @Override
    public Message receiveMessages() throws NamingException, JMSException {

        return consumer.receive(20000);

    }

项目阅读器 。

@Override
    public Message read() throws Exception {

        return jmsServiceReceiver.receiveMessages();
    }

谢谢!我会感激你的帮助:)

parallel-processing jms spring-batch weblogic spring-jms
1个回答
1
投票

有一个 BatchMessageListenerContainerspring-batch-infrastructure-tests 子项目。

https:/github.comspring-projectsspring-batchblobd8fc58338d3b059b67b5f777adc132d2564d7402spring-batch-infrastructure-testssrcmainjavaorgspringframeworkbatchcontainerjmsBatchMessageListenerContainer.java。

消息监听器容器,适应于通过配置提供的建议拦截消息接收。 要在单个事务中实现消息的批量化,请在建议链中使用TransactionInterceptor和RepeatOperationsInterceptor(在基类中设置或不设置事务管理器)。容器将不再接收单个消息并对其进行处理,而是使用RepeatOperations在同一线程中接收多个消息。与RepeatOperations和事务拦截器一起使用。如果事务拦截器使用XA,那么就使用XA连接工厂,否则就使用TransactionAwareConnectionFactoryProxy将JMS会话与正在进行的事务同步(开启了失败后重复消息的可能性)。在后一种情况下,你将不需要在基类中提供一个事务管理器--它只是在路上,防止JMS会话与数据库事务同步。

也许你可以根据你的用例来调整它。

© www.soinside.com 2019 - 2024. All rights reserved.