无数据源轮询的MessageChannelPartionHandler堆栈跟踪服务

问题描述 投票:0回答:1

我们将大量的Spring Batch分区作业从2.2迁移到4.x(XML Configuration Of Partition Jobs)。当我调试分区作业时,我得到了这个堆栈跟踪。

2020-05-13 17:03:38,311 ERROR [org.springframework.batch.core.step.AbstractStep] (SimpleAsyncTaskExecutor-1) Encountered an error executing step step.partitionStep in job job.partitioned: java.lang.ClassCastException: class org.springframework.batch.core.StepExecution cannot be cast to class java.util.Collection (org.springframework.batch.core.StepExecution)
    at deployment.org.springframework.batch.integration.partition.MessageChannelPartitionHandler.receiveReplies(MessageChannelPartitionHandler.java:293)
    at deployment.org.springframework.batch.integration.partition.MessageChannelPartitionHandler.handle(MessageChannelPartitionHandler.java:232)
    at deployment.org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:106)
    at deployment.org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:203)
    at deployment.org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)

启用日志后,我看到前4条消息进入聚合器 最后一条进入MessageChannelPartitionHandler。

2020-05-13 15:04:40,066 DEBUG [AggregatingMessageHandler] (task-scheduler-3) org.springframework.integration.config.AggregatorFactoryBean#38 received message: GenericMessage [payload=StepExecution: id=850, version=3, name=step:4, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=, headers={sequenceNumber=4, sequenceSize=5, jms_destination=ActiveMQQueue[jms.queue.partitionReplyQueue], priority=4, jms_timestamp=1589403876387, replyChannel=policy.step.applycredits.partitioned.jms.reply, jms_redelivered=false, JMSXDeliveryCount=1, jms_replyTo=ActiveMQQueue[jms.queue.partitionReplyQueue], correlationId=171:step, id=ad86a496-1cb0-08d7-801f-e7c582a51ee5, jms_messageId=ID:5a125750-955d-11ea-975d-98fa9b1fb9db, timestamp=1589403876412}]
    2020-05-13 15:04:46,072 DEBUG [AggregatingMessageHandler] (task-scheduler-10) org.springframework.integration.config.AggregatorFactoryBean#38 received message: GenericMessage [payload=StepExecution: id=847, version=3, name=step:3, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=, headers={sequenceNumber=0, sequenceSize=5, jms_destination=ActiveMQQueue[jms.queue.partitionReplyQueue], priority=4, jms_timestamp=1589403885895, replyChannel=policy.step.applycredits.partitioned.jms.reply, jms_redelivered=false, JMSXDeliveryCount=1, jms_replyTo=ActiveMQQueue[jms.queue.partitionReplyQueue], correlationId=171:step, id=a066dcca-27b4-ad87-b639-a7bf15125f1c, jms_messageId=ID:5fbd2591-955d-11ea-975d-98fa9b1fb9db, timestamp=1589403885907}]
    2020-05-13 15:04:52,073 DEBUG [AggregatingMessageHandler] (task-scheduler-2) org.springframework.integration.config.AggregatorFactoryBean#38 received message: GenericMessage [payload=StepExecution: id=846, version=3, name=step:2, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=, headers={sequenceNumber=1, sequenceSize=5, jms_destination=ActiveMQQueue[jms.queue.partitionReplyQueue], priority=4, jms_timestamp=1589403888831, replyChannel=policy.step.applycredits.partitioned.jms.reply, jms_redelivered=false, JMSXDeliveryCount=1, jms_replyTo=ActiveMQQueue[jms.queue.partitionReplyQueue], correlationId=171:step, id=a25ce9fa-9f7c-f12e-1495-a7e91e12f43a, jms_messageId=ID:617d2512-955d-11ea-975d-98fa9b1fb9db, timestamp=1589403888860}]
    2020-05-13 15:04:58,074 DEBUG [AggregatingMessageHandler] (task-scheduler-5) org.springframework.integration.config.AggregatorFactoryBean#38 received message: GenericMessage [payload=StepExecution: id=849, version=3, name=step:1, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=, headers={sequenceNumber=2, sequenceSize=5, jms_destination=ActiveMQQueue[jms.queue.partitionReplyQueue], priority=4, jms_timestamp=1589403893969, replyChannel=policy.step.applycredits.partitioned.jms.reply, jms_redelivered=false, JMSXDeliveryCount=1, jms_replyTo=ActiveMQQueue[jms.queue.partitionReplyQueue], correlationId=171:step, id=98f5c7f8-0133-4f74-3666-0b1d36055341, jms_messageId=ID:648d2433-955d-11ea-975d-98fa9b1fb9db, timestamp=1589403893980}]
    2020-05-13 15:05:19,271 DEBUG [org.springframework.batch.integration.partition.MessageChannelPartitionHandler] (SimpleAsyncTaskExecutor-1) Received replies: GenericMessage [payload=StepExecution: id=848, version=3, name=step:0, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=, headers={sequenceNumber=3, sequenceSize=5, jms_destination=ActiveMQQueue[jms.queue.partitionReplyQueue], priority=4, jms_timestamp=1589403876047, replyChannel=policy.step.applycredits.partitioned.jms.reply, jms_redelivered=false, JMSXDeliveryCount=1, jms_replyTo=ActiveMQQueue[jms.queue.partitionReplyQueue], correlationId=171:step, id=093e2d94-1911-1b4b-6334-d7d3032438ad, jms_messageId=ID:59de760f-955d-11ea-975d-98fa9b1fb9db, timestamp=1589403876109}]

我的配置如下。

<!-- Partition Handler -->
<bean id="partitioned.jms.handler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
    <property name="messagingOperations">
        <bean class="org.springframework.integration.core.MessagingTemplate">
            <property name="defaultChannel" ref="jms.requests"/>
            <property name="receiveTimeout" value="${partitioned.timeout}"/>
        </bean>
    </property>
    <property name="stepName" value="partitioned.step.name"/>
    <property name="gridSize" value="${step.partitioned.gridSize}"/>
    <property name="replyChannel" ref="partitioned.jms.reply"/>        
</bean>

<int:aggregator 
    input-channel="partitioned.jms.reply" 
    ref="partitioned.jms.handler"
/>

我的配置中是否有遗漏的地方?我以为聚合器会得到所有5条消息的响应,并将其发送到PartitionHandler。

谢谢你

spring-integration spring-batch
1个回答
0
投票

我不得不添加聚合器release-strategy。

    <bean id="partitionReleaseStrategy"
        class="org.springframework.integration.aggregator.SimpleSequenceSizeReleaseStrategy" 
    />

<int:aggregator 
    input-channel="partitioned.jms.reply" 
    ref="partitioned.jms.handler"
    release-strategy="partitionReleaseStrategy"
/>
© www.soinside.com 2019 - 2024. All rights reserved.