在处理所有消息并将其写入.dat文件后,如何在春季批处理作业中使用KafkaItemReader时提交偏移量?

问题描述 投票:0回答:1

我开发了一个Spring Batch Job,它使用KafkaItemReader类从Kafka主题中读取。我只想在处理已定义的块中读取的消息并将其成功写入到Output .dat文件时才提交偏移量。

@ Bean公共作业kafkaEventReformatjob(@Qualifier(“ MaintStep”)步骤MainStep,@Qualifier(“ moveFileToFolder”)步骤moveFileToFolder,@Qualifier(“ compressFile”)步骤compressFile,JobExecutionListener侦听器){返回jobBuilderFactory.get(“ kafkaEventReformatJob”).listener(侦听器).incrementer(新的RunIdIncrementer()).flow(MainStep).next(moveFileToFolder).next(compressFile)。结束()。建立();}

 @Bean
Step MainStep(
        ItemProcessor<IncomingRecord, List<Record>> flatFileItemProcessor,
        ItemWriter<List<Record>> flatFileWriter)
{
    return stepBuilderFactory.get("mainStep")
            .<InputRecord, List<Record>> chunk(5000)
            .reader(kafkaItemReader())
            .processor(flatFileItemProcessor)
            .writer(writer())
            .listener(basicStepListener)
            .build();
}
//Reader reads all the messages from akfka topic and sending back in form of IncomingRecord.
 @Bean
KafkaItemReader<String, IncomingRecord> kafkaItemReader() {
    Properties props = new Properties();
    props.putAll(this.properties.buildConsumerProperties());
    List<Integer> partitions = new ArrayList<>();
    partitions.add(0);
    partitions.add(1);
    return new KafkaItemReaderBuilder<String, IncomingRecord>()
            .partitions(partitions)
            .consumerProperties(props)
            .name("records")
            .saveState(true)
            .topic(topic)
            .pollTimeout(Duration.ofSeconds(40L))
            .build();
}

  @Bean
public ItemWriter<List<Record>> writer() {
    ListUnpackingItemWriter<Record> listUnpackingItemWriter = new ListUnpackingItemWriter<>();
    listUnpackingItemWriter.setDelegate(flatWriter());
    return listUnpackingItemWriter;
}

public ItemWriter<Record> flatWriter() {
    FlatFileItemWriter<Record> fileWriter = new FlatFileItemWriter<>();
    String tempFileName = "abc";
    LOGGER.info("Output File name " + tempFileName + " is in working directory ");
    String workingDir = service.getWorkingDir().toAbsolutePath().toString();
    Path outputFile = Paths.get(workingDir, tempFileName);
    fileWriter.setName("fileWriter");
    fileWriter.setResource(new FileSystemResource(outputFile.toString()));
    fileWriter.setLineAggregator(lineAggregator());
    fileWriter.setForceSync(true);
    fileWriter.setFooterCallback(customFooterCallback());
    fileWriter.close();
    LOGGER.info("Successfully created the file writer");
    return fileWriter;

}

@StepScope
@Bean
public TransformProcessor processor() {
    return new TransformProcessor();
}

================================================ =============================

作家类

 @BeforeStep
public void beforeStep(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
}

@AfterStep
public void afterStep(StepExecution stepExecution) {
    this.stepExecution.setWriteCount(count);
}

@Override
public void write(final List<? extends List<Record>> lists) throws Exception {

    List<Record> consolidatedList = new ArrayList<>();
    for (List<Record> list : lists) {
        if (!list.isEmpty() && null != list)
            consolidatedList.addAll(list);
    }

    delegate.write(consolidatedList);
    count += consolidatedList.size(); // to count Trailer record count
}

================================================ ===============

物品处理器

@ Override公共列表过程(IncomingRecord记录){

    List<Record> recordList = new ArrayList<>();

    if (null != record.getEventName() and a few other conditions inside this section) {
        // setting values of Record Class by extracting from the IncomingRecord.
        recordList.add(the valid records which matching the condition);
        }else{
        return null;
        }
apache-kafka spring-batch kafka-consumer-api spring-kafka
1个回答
0
投票

同步两个事务资源(例如队列和数据库)之间的读取操作和写入操作通过使用同时协调两个事务管理器(2PC协议)的JTA事务管理器,可以实现此功能。

但是,如果其中一种资源不是事务性的(例如大多数文件系统),则无法使用这种方法。所以除非你用事务文件系统和协调kafka事务管理器和文件系统事务管理器的JTA事务管理器。您需要另一种方法,例如Compensating Transaction pattern。在您的情况下,“撤消”操作(补偿操作)将倒退偏移量在发生故障的块之前的位置。

© www.soinside.com 2019 - 2024. All rights reserved.