JBeret和JSR352,有关重新启动和持久数据的一些说明

问题描述 投票:0回答:1

我目前正在使用JBeret作为JSR 352规范的批处理实现。

第一个问题

我正在尝试在处理器故障的地方重做块作业,并存储重新启动位置,以便可以从上一个成功的完整索引重新开始。

这是Reader

@Named
public class MyReaderFails extends AbstractItemReader {

    @Inject
    Logger logger;

    List<Integer> output;
    int index;

    @Inject
    StepContext stepContext;

    @Override
    public Object readItem() throws Exception {
        logger.info("reading item: {}", index);
        return output.get(index++);
    }

    @Override
    public void open(Serializable checkpoint) throws Exception {
        logger.info("open: {}", checkpoint);
        int startIndex = Optional.ofNullable(checkpoint).map(Integer.class::cast).orElse(0);
        output = IntStream.range(startIndex, 30).boxed().collect(Collectors.toList());
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        logger.info("current checkpoint: {}", index);
        return index;
    }
}

这是处理器

@Named
public class MyProcessorFails implements ItemProcessor {

    @Inject
    Logger logger;

    @Inject
    @BatchProperty(name = "itemnumerror")
    Integer itemnumerror;

    @Inject
    @BatchProperty(name = "error")
    Boolean error;

    @Override
    public Object processItem(Object o) throws Exception {
        logger.info("input: {}", o);
        if (itemnumerror == o && error) {
            throw new RuntimeException(); //first time batch is started, throws an error
        }
        Integer output = (Integer)o + 30;

        return output;
    }
}

The Writer

@Named
public class MyWriterFails extends AbstractItemWriter {

    @Inject
    Logger logger;

    @Inject
    StepContext stepContext;

    @SuppressWarnings("squid:S2629")
    @Override
    public void writeItems(List<Object> list) throws Exception {
        logger.info("output: {}", list.stream().map(String::valueOf).collect(Collectors.joining(" , ", "{", "}")));
        ArrayList<Integer> processed = Optional.ofNullable(stepContext.getPersistentUserData()).map(ArrayList.class::cast).orElse(new ArrayList<Integer>());
        processed.addAll(list.stream().map(Integer.class::cast).collect(Collectors.toList()));
        stepContext.setPersistentUserData(processed);
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return Optional.of(stepContext.getPersistentUserData()).map(List.class::cast).map(List::size).orElse(0);
    }
}

现在我希望如果作业失败,它会将JOB_EXECUTION

RESTART_POSITION列设置为从checkpointInfo返回的索引,但是不会发生。

因此,当我尝试从上一个ID重新启动作业时,检查点始终为null。

如何存储检查点以便从open

方法中检索它?

第二个问题

在应用程序测试中,我经常看到从stepContext对象设置持久性用户数据(另请参见我的编写器实现)。这的真正用途是什么?保存的数据我该怎么办?

[我也猜想,当处理成千上万条记录时,这种做法会导致严重的内存开销。

任何提示吗?

我目前正在使用JBeret作为JSR 352规范的批处理实现。第一个问题我正在尝试重新生成处理器失败的大块作业并存储重新启动位置,以便我可以从上一个.... >>>

这是我在测试中使用的作业xml:

<?xml version="1.0" encoding="UTF-8"?>

<job id="fail-restart" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
     version="1.0">
    <step id="fail-restart.step1">
        <chunk item-count="10">
            <reader ref="myReaderFails">
                <properties>
                    <property name="logger" value="java.util.logging.Logger"/>
                </properties>
            </reader>
            <processor ref="myProcessorFails">
            <properties>
                <property name="logger" value="java.util.logging.Logger"/>
                <property name="error" value="#{jobParameters['error']}"/>
                <property name="itemnumerror" value="15"/>
            </properties>
            </processor>
            <writer ref="myWriterFails">
            <properties>
                <property name="logger" value="java.util.logging.Logger"/>
            </properties>
            </writer>
        </chunk>
    </step>
</job>

这是测试方法:


public class SimpleIT {
    private static final JobOperator jobOperator = BatchRuntime.getJobOperator();

    @Test
    public void failRestart() throws Exception {
        final Properties params = new Properties();
        params.setProperty("error", String.valueOf(Boolean.TRUE));

        final long jobExecutionId = jobOperator.start("fail-restart", params);
        JobExecutionImpl jobExecution = (JobExecutionImpl) jobOperator.getJobExecution(jobExecutionId);
        jobExecution.awaitTermination(5, TimeUnit.MINUTES);
        Assert.assertEquals(BatchStatus.FAILED, jobExecution.getBatchStatus());

        params.setProperty("error", String.valueOf(Boolean.FALSE));
        final long restartId = jobOperator.restart(jobExecutionId, params);
        jobExecution = (JobExecutionImpl) jobOperator.getJobExecution(restartId);
        jobExecution.awaitTermination(5, TimeUnit.MINUTES);
        Assert.assertEquals(BatchStatus.COMPLETED, jobExecution.getBatchStatus());
    }
}

输出:

Dec 09, 2019 11:36:18 AM sample.MyReaderFails open
INFO: open: null
output list: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 0
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 0
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 1
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 1
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 2
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 2
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 3
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 3
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 4
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 4
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 5
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 5
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 6
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 6
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 7
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 7
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 8
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 8
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 9
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 9
Dec 09, 2019 11:36:18 AM sample.MyWriterFails writeItems
INFO: output: {30 , 31 , 32 , 33 , 34 , 35 , 36 , 37 , 38 , 39}
Dec 09, 2019 11:36:18 AM sample.MyReaderFails checkpointInfo
INFO: current checkpoint: 10
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 10
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 10
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 11
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 11
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 12
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 12
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 13
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 13
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 14
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 14
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 15
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 15
Dec 09, 2019 11:36:18 AM org.jberet.runtime.runner.ChunkRunner readProcessWriteItems
ERROR: ProcessingInfo{count=6, timerExpired=false, itemState=RUNNING, chunkState=RUNNING, checkpointPosition=9, readPosition=15, failurePoint=null}
Dec 09, 2019 11:36:18 AM org.jberet.runtime.runner.ChunkRunner run
ERROR: item-count=10, time-limit=0, skip-limit=-1, skipCount=0, retry-limit=-1, retryCount=0
Dec 09, 2019 11:36:18 AM org.jberet.runtime.runner.ChunkRunner run
ERROR: JBERET000007: Failed to run job fail-restart, fail-restart.step1, org.jberet.job.model.Chunk@7f938562
java.lang.RuntimeException
    at sample.MyProcessorFails.processItem(MyProcessorFails.java:38)
    at org.jberet.runtime.runner.ChunkRunner.processItem(ChunkRunner.java:422)
    at org.jberet.runtime.runner.ChunkRunner.readProcessWriteItems(ChunkRunner.java:335)
    at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:208)
    at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:225)
    at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:144)
    at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164)
    at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88)
    at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:60)
    at org.jberet.spi.JobExecutor$1.run(JobExecutor.java:99)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)

Dec 09, 2019 11:36:18 AM sample.MyReaderFails open
INFO: open: 10
output list: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 0
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 10
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 1
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 11
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 2
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 12
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 3
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 13
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 4
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 14
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 5
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 15
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 6
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 16
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 7
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 17
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 8
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 18
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 9
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 19
Dec 09, 2019 11:36:18 AM sample.MyWriterFails writeItems
INFO: output: {40 , 41 , 42 , 43 , 44 , 45 , 46 , 47 , 48 , 49}
Dec 09, 2019 11:36:18 AM sample.MyReaderFails checkpointInfo
INFO: current checkpoint: 10
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 10
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 20
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 11
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 21
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 12
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 22
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 13
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 23
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 14
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 24
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 15
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 25
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 16
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 26
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 17
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 27
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 18
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 28
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 19
Dec 09, 2019 11:36:18 AM sample.MyProcessorFails processItem
INFO: input: 29
Dec 09, 2019 11:36:18 AM sample.MyWriterFails writeItems
INFO: output: {50 , 51 , 52 , 53 , 54 , 55 , 56 , 57 , 58 , 59}
Dec 09, 2019 11:36:18 AM sample.MyReaderFails checkpointInfo
INFO: current checkpoint: 20
Dec 09, 2019 11:36:18 AM sample.MyReaderFails readItem
INFO: reading item: 20
Dec 09, 2019 11:36:18 AM sample.MyReaderFails checkpointInfo
INFO: current checkpoint: 21
Weld SE container STATIC_INSTANCE shut down by shutdown hook

Process finished with exit code 0

java jsr352 jberet jbatch
1个回答
1
投票

这是我在测试中使用的作业xml:

© www.soinside.com 2019 - 2024. All rights reserved.