Spring管理的KafkaTemplate提供
template.send(record).addCallback(...
template.executeInTransaction(...
现在假设我有一个方法doWork(),它在一个事件(比如一个TCP / IP消息)上触发。
@Autowired
KafkaTemplate template;
// This method is triggered on a event
doWork(EventType event){
switch(event){
case Events.Type1 :
template.send(record); break;
case Events.Type2 :
// Question : How do I achieve a commit of all my previous sends here?
default : break;
}
}
基本上,我需要通过在doWork()或a上添加@Transaction来实现事务
template.executeInTransaction(...
在代码中。但是我想批处理几个[template.send()]并在几次调用doWork()方法后进行提交,我该如何实现呢?
我的生产者配置启用了事务,并且KafkaTransactionManager连接到生产者工厂。
kafkaTemplate.executeInTransaction(t -> {
boolean stayIntransaction = true;
while (stayInTransaction) {
Event event = readTcp()
doWork(event);
stayInTransaction = transactionDone(event);
}
}
只要doWork()
方法使用相同的模板,并且它在回调范围内运行,工作就会在事务中运行。
要么
@Transactional
public void doIt() {
boolean stayIntransaction = true;
while (stayInTransaction) {
Event event = readTcp()
doWork(event);
stayInTransaction = transactionDone(event);
}
}
使用声明式事务时。
如果TCP事件是异步的,您将以某种方式将它们交给运行事务的线程,例如使用BlockingQueue<?>
。