使用Akka Stream和Kafka偏移提交将事件从Kafka流到Couchbase

问题描述 投票:0回答:1

[我正在尝试使用Alpakka设计Akka流,以从kafka主题中读取事件并将其放到Couchbase中。

到目前为止,我有以下代码,它似乎可以以某种方式工作:

Consumer
      .committableSource(consumerSettings, Subscriptions.topics(topicIn))
      .map(profile ⇒ {
        RawJsonDocument.create(profile.record.key(), profile.record.value())
      })
      .via(
        CouchbaseFlow.upsertDoc(
          sessionSettings,
          writeSettings,
          bucketName
        )
      )
      .log("Couchbase stream logging")
      .runWith(Sink.seq)

“以某种方式,我的意思是,该流实际上是从主题读取事件,并将它们作为json文档放入Couchbase,尽管我不知道如何将消费者偏移量提交给Kafka,但它看起来甚至还不错。

如果我清楚地了解隐藏在Kafka使用者偏移量后面的主要思想,则万一发生任何故障或重新启动,该流将从上次提交的偏移量中读取所有消息,并且由于我们尚未提交任何偏移量,因此它可能会重新-再次读取在上一个会话中读取的记录。

所以我的假设正确吗?如果是这样,在从Kafka读取并发布到某些数据库的情况下,如何处理使用者提交?官方的Akka Streams文档提供了示例,展示了如何使用普通的Kafka Streams处理此类情况,因此我不知道如何在我的情况下提交偏移量。

非常感谢!

scala akka akka-stream alpakka
1个回答
0
投票

您将需要在Couchbase中提交偏移量,以获得“恰好一次”的语义。

这应该有所帮助:https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#offset-storage-external-to-kafka

© www.soinside.com 2019 - 2024. All rights reserved.