我开发了一个Spring Batch Job,它使用KafkaItemReader类从Kafka主题中读取。我只想在处理已定义的块中读取的消息并将其成功写入到Output .dat文件时才提交偏移量。
@ Bean公共作业kafkaEventReformatjob(@Qualifier(“ MaintStep”)步骤MainStep,@Qualifier(“ moveFileToFolder”)步骤moveFileToFolder,@Qualifier(“ compressFile”)步骤compressFile,JobExecutionListener侦听器){返回jobBuilderFactory.get(“ kafkaEventReformatJob”).listener(侦听器).incrementer(新的RunIdIncrementer()).flow(MainStep).next(moveFileToFolder).next(compressFile)。结束()。建立();}
@Bean
Step MainStep(
ItemProcessor<IncomingRecord, List<Record>> flatFileItemProcessor,
ItemWriter<List<Record>> flatFileWriter)
{
return stepBuilderFactory.get("mainStep")
.<InputRecord, List<Record>> chunk(5000)
.reader(kafkaItemReader())
.processor(flatFileItemProcessor)
.writer(writer())
.listener(basicStepListener)
.build();
}
//Reader reads all the messages from akfka topic and sending back in form of IncomingRecord.
@Bean
KafkaItemReader<String, IncomingRecord> kafkaItemReader() {
Properties props = new Properties();
props.putAll(this.properties.buildConsumerProperties());
List<Integer> partitions = new ArrayList<>();
partitions.add(0);
partitions.add(1);
return new KafkaItemReaderBuilder<String, IncomingRecord>()
.partitions(partitions)
.consumerProperties(props)
.name("records")
.saveState(true)
.topic(topic)
.pollTimeout(Duration.ofSeconds(40L))
.build();
}
@Bean
public ItemWriter<List<Record>> writer() {
ListUnpackingItemWriter<Record> listUnpackingItemWriter = new ListUnpackingItemWriter<>();
listUnpackingItemWriter.setDelegate(flatWriter());
return listUnpackingItemWriter;
}
public ItemWriter<Record> flatWriter() {
FlatFileItemWriter<Record> fileWriter = new FlatFileItemWriter<>();
String tempFileName = "abc";
LOGGER.info("Output File name " + tempFileName + " is in working directory ");
String workingDir = service.getWorkingDir().toAbsolutePath().toString();
Path outputFile = Paths.get(workingDir, tempFileName);
fileWriter.setName("fileWriter");
fileWriter.setResource(new FileSystemResource(outputFile.toString()));
fileWriter.setLineAggregator(lineAggregator());
fileWriter.setForceSync(true);
fileWriter.setFooterCallback(customFooterCallback());
fileWriter.close();
LOGGER.info("Successfully created the file writer");
return fileWriter;
}
@StepScope
@Bean
public TransformProcessor processor() {
return new TransformProcessor();
}
================================================ =============================
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
@AfterStep
public void afterStep(StepExecution stepExecution) {
this.stepExecution.setWriteCount(count);
}
@Override
public void write(final List<? extends List<Record>> lists) throws Exception {
List<Record> consolidatedList = new ArrayList<>();
for (List<Record> list : lists) {
if (!list.isEmpty() && null != list)
consolidatedList.addAll(list);
}
delegate.write(consolidatedList);
count += consolidatedList.size(); // to count Trailer record count
}
================================================ ===============
@ Override公共列表过程(IncomingRecord记录){
List<Record> recordList = new ArrayList<>();
if (null != record.getEventName() and a few other conditions inside this section) {
// setting values of Record Class by extracting from the IncomingRecord.
recordList.add(the valid records which matching the condition);
}else{
return null;
}
同步两个事务资源(例如队列和数据库)之间的读取操作和写入操作通过使用同时协调两个事务管理器(2PC协议)的JTA事务管理器,可以实现此功能。
但是,如果其中一种资源不是事务性的(例如大多数文件系统),则无法使用这种方法。所以除非你用事务文件系统和协调kafka事务管理器和文件系统事务管理器的JTA事务管理器。您需要另一种方法,例如Compensating Transaction pattern。在您的情况下,“撤消”操作(补偿操作)将倒退偏移量在发生故障的块之前的位置。