[我正在尝试使用Alpakka设计Akka流,以从kafka主题中读取事件并将其放到Couchbase中。
到目前为止,我有以下代码,它似乎可以以某种方式工作:
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topicIn))
.map(profile ⇒ {
RawJsonDocument.create(profile.record.key(), profile.record.value())
})
.via(
CouchbaseFlow.upsertDoc(
sessionSettings,
writeSettings,
bucketName
)
)
.log("Couchbase stream logging")
.runWith(Sink.seq)
“以某种方式,我的意思是,该流实际上是从主题读取事件,并将它们作为json文档放入Couchbase,尽管我不知道如何将消费者偏移量提交给Kafka,但它看起来甚至还不错。
如果我清楚地了解隐藏在Kafka使用者偏移量后面的主要思想,则万一发生任何故障或重新启动,该流将从上次提交的偏移量中读取所有消息,并且由于我们尚未提交任何偏移量,因此它可能会重新-再次读取在上一个会话中读取的记录。
所以我的假设正确吗?如果是这样,在从Kafka读取并发布到某些数据库的情况下,如何处理使用者提交?官方的Akka Streams文档提供了示例,展示了如何使用普通的Kafka Streams处理此类情况,因此我不知道如何在我的情况下提交偏移量。
非常感谢!
您将需要在Couchbase中提交偏移量,以获得“恰好一次”的语义。
这应该有所帮助:https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#offset-storage-external-to-kafka