如何使用Akka流读取mongodb

问题描述 投票:0回答:1

我有一个mongodb,其中包含每日的数据更新和插入,我想实时获取与更新和插入相关的数据,以推送到另一个数据库,用akka流可以做到吗?

scala akka akka-stream
1个回答
0
投票
据我所知mongo提供了这样的功能:

https://docs.mongodb.com/manual/changeStreams/

您只需要将其包装在akka-stream来源中:

https://doc.akka.io/docs/akka/current/stream/stream-customize.html#custom-processing-with-graphstage

[进一步调查后,我发现akka.stream.scaladsl.Source伴随对象包含Source工厂方法fromIteratorlink这需要一个scala.collection.Iterator.

我能够通过使用Java api获得迭代器来创建Source:>

object Main extends App { import org.bson.Document import com.mongodb.MongoClient import com.mongodb.client.MongoDatabase import com.mongodb.client.MongoCollection import com.mongodb.client.MongoCursor import com.mongodb.client.model.changestream.ChangeStreamDocument import scala.collection.JavaConverters._ import akka.stream.scaladsl.Source val mongoClient = new MongoClient() val database: MongoDatabase = mongoClient.getDatabase("mydb"); val inventory: MongoCollection[Document] = database.getCollection("inventory") val cursor: MongoCursor[ChangeStreamDocument[Document]] = inventory.watch().iterator() val next: ChangeStreamDocument[Document] = cursor.next() val changeEventSoruce: Source[Any, akka.NotUsed] = Source.fromIterator(() => cursor.asScala) }

build.sbt:

libraryDependencies += "org.mongodb" % "mongo-java-driver" % "3.11.0" libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.25"

© www.soinside.com 2019 - 2024. All rights reserved.