我有一个mongodb,其中包含每日的数据更新和插入,我想实时获取与更新和插入相关的数据,以推送到另一个数据库,用akka流可以做到吗?
https://docs.mongodb.com/manual/changeStreams/
您只需要将其包装在akka-stream
来源中:
https://doc.akka.io/docs/akka/current/stream/stream-customize.html#custom-processing-with-graphstage
[进一步调查后,我发现akka.stream.scaladsl.Source
伴随对象包含Source
工厂方法fromIterator
:link这需要一个scala.collection.Iterator.
我能够通过使用Java api获得迭代器来创建Source
:>
object Main extends App {
import org.bson.Document
import com.mongodb.MongoClient
import com.mongodb.client.MongoDatabase
import com.mongodb.client.MongoCollection
import com.mongodb.client.MongoCursor
import com.mongodb.client.model.changestream.ChangeStreamDocument
import scala.collection.JavaConverters._
import akka.stream.scaladsl.Source
val mongoClient = new MongoClient()
val database: MongoDatabase = mongoClient.getDatabase("mydb");
val inventory: MongoCollection[Document] = database.getCollection("inventory")
val cursor: MongoCursor[ChangeStreamDocument[Document]] =
inventory.watch().iterator()
val next: ChangeStreamDocument[Document] = cursor.next()
val changeEventSoruce: Source[Any, akka.NotUsed] =
Source.fromIterator(() => cursor.asScala)
}
build.sbt:
libraryDependencies += "org.mongodb" % "mongo-java-driver" % "3.11.0"
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.25"