任何人都可以解释我地图和mapAsync w.r.t AKKA流之间的区别吗?据说In the documentation
可以使用mapAsync或mapAsyncUnordered执行涉及外部非基于流的服务的流转换和副作用
为什么我们不能简单地在这里映射?我假设Flow,Source,Sink都是Monadic,因此map应该工作得很好w.r.t这些性质的延迟?
签名
差异最好在signatures中突出显示:Flow.map
接受一个返回类型T
的函数,而Flow.mapAsync
接受一个返回类型Future[T]
的函数。
实际例子
例如,假设我们有一个函数,它根据用户ID在数据库中查询用户的全名:
type UserID = String
type FullName = String
val databaseLookup : UserID => FullName = ??? //implementation unimportant
给定Source
值的akka流UserID
,我们可以在流中使用Flow.map
来查询数据库并将全名打印到控制台:
val userIDSource : Source[UserID, _] = ???
val stream =
userIDSource.via(Flow[UserID].map(databaseLookup))
.to(Sink.foreach[FullName](println))
.run()
此方法的一个限制是此流将一次只进行1 db查询。此串行查询将是一个“瓶颈”,可能会阻止我们的流中的最大吞吐量。
我们可以尝试使用Future
通过并发查询来提高性能:
def concurrentDBLookup(userID : UserID) : Future[FullName] =
Future { databaseLookup(userID) }
val concurrentStream =
userIDSource.via(Flow[UserID].map(concurrentDBLookup))
.to(Sink.foreach[Future[FullName]](_ foreach println))
.run()
这个简单的附录的问题是我们已经有效地消除了背压。
Sink只是拉入未来并添加了foreach println
,与数据库查询相比,它相对较快。该流将不断将需求传播到源并在Flow.map
内产生更多的期货。因此,同时运行的databaseLookup
的数量没有限制。不受干扰的并行查询最终可能使数据库过载。
Flow.mapAsync
救援;我们可以同时进行数据库访问,同时限制同时查找的数量:
val maxLookupCount = 10
val maxLookupConcurrentStream =
userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
.to(Sink.foreach[FullName](println))
.run()
另外请注意Sink.foreach
变得更简单,它不再需要Future[FullName]
而只需要FullName
。
无序异步映射
如果不需要维护UserID到FullNames的顺序排序,那么您可以使用Flow.mapAsyncUnordered
。例如:您只需要将所有名称打印到控制台,但不关心它们的打印顺序。