使用KafkaTransactionManager在事务KafkaTemplate中基于事件的提交

问题描述 投票:0回答:1

Spring管理的KafkaTemplate提供

template.send(record).addCallback(...
template.executeInTransaction(...

现在假设我有一个方法doWork(),它在一个事件(比如一个TCP / IP消息)上触发。

@Autowired
KafkaTemplate template;

// This method is triggered on a event
doWork(EventType event){
    switch(event){
        case Events.Type1 :
            template.send(record); break;
        case Events.Type2 :
            // Question : How do I achieve a commit of all my previous sends here?
         default : break;
    }
}

基本上,我需要通过在doWork()或a上添加@Transaction来实现事务

template.executeInTransaction(...

在代码中。但是我想批处理几个[template.send()]并在几次调用doWork()方法后进行提交,我该如何实现呢?

我的生产者配置启用了事务,并且KafkaTransactionManager连接到生产者工厂。

java spring kafka-producer-api spring-kafka
1个回答
0
投票
kafkaTemplate.executeInTransaction(t -> {
    boolean stayIntransaction = true;
    while (stayInTransaction) {
        Event event = readTcp()
        doWork(event);
        stayInTransaction = transactionDone(event);
    }
}

只要doWork()方法使用相同的模板,并且它在回调范围内运行,工作就会在事务中运行。

要么

@Transactional
public void doIt() {
    boolean stayIntransaction = true;
    while (stayInTransaction) {
        Event event = readTcp()
        doWork(event);
        stayInTransaction = transactionDone(event);
    }
}

使用声明式事务时。

如果TCP事件是异步的,您将以某种方式将它们交给运行事务的线程,例如使用BlockingQueue<?>

© www.soinside.com 2019 - 2024. All rights reserved.