我想用Alpakka使用Transactional.Source Api消耗记录,然后用Transactional.flow将记录生成到另一个主题,但是文档中说需要传递TransactionId。
我应该如何创建TransactionId,比如下面的代码。
```via(Transactional.flow(producerSettings, transactionalId))```
是按制作人还是按制作人在阿尔帕卡的记录?
事务性.id在抵御僵尸方面起着重要作用。但是维护一个标识符在不同的生产者会话中是一致的,同时也能正确地围出僵尸,这就有点棘手了。
正确围堵僵尸的关键是确保对于给定的transactional.id来说,读-进程-写循环中的输入主题和分区总是相同的。如果这一点不正确,那么一些消息就有可能通过事务提供的栅栏泄漏。
例如,在一个分布式流处理应用中,假设topic-partition tp0最初是由transactional.id T0处理的。 如果在后来的某个时刻,它可以被映射到另一个具有transactional.id T1的生产者身上,那么T0和T1之间就没有围栏了。所以,tp0的消息有可能被重新处理,违反了精确一次处理的保证。
实际操作中,要么必须将输入分区和transactional.ids之间的映射存储在外部存储中,要么对其进行一些静态编码。Kafka Streams选择了后一种方式来解决这个问题。事务如何执行,以及如何调整它们