我有我周围保持请求之间的Alpakka Elasticsearch Sink
。当我的请求,我创建一个HTTP请求Source
并把它转换成Elasticsearch Source
s的WriteMessage
,然后运行与mySource.runWith(theElasticseachSink)
。
Flow.fromSourceAndSink
帮助莫名其妙脱钩呢?我的目标是要知道什么时候该HTTP下载完成(包括它穿过via
s),并能够重用下沉。
你可以通过一个流动的单件围绕你愿意,你甚至可以绕过整个executabe图(这些都是immutables)。运行()调用物化流,但不会改变你的图形或其部件。
1)既然你想知道什么时候HttpDownload通过的流量,为什么不使用全图表未来[完成]?假设您的来电elasticsearch是异步的,这应该是平等的,因为你的水槽只是触发呼叫,并不会等待。你也可以使用Source.queue(https://doc.akka.io/docs/akka/2.5/stream/operators/Source/queue.html),只是增加你的消息队列,然后重新使用已定义的图,从而你可以需要proocessing时添加新的消息。这其中也物化一个SourceQueueWithComplete让你停止流。除此之外,只要需要,而无需等待使用它另一个流重用下沉。
2)如上所述:没有,则不需要多次实例化一个水槽。
最好的问候,安迪
事实证明,Alpakka的Elasticsearch库还支持流量的形状,所以我可以有我的源经由去,并通过该物化未来的任何下沉运行。 Sink.foreach
工作正常这里测试的目的,例如,在https://github.com/danellis/akka-es-test。
Flow fromFunction { product: Product =>
WriteMessage.createUpsertMessage(product.id, product.attributes)
} via ElasticsearchFlow.create[Map[String, String]](index, "_doc")
定义es.flow
然后
val graph = response.entity.withSizeLimit(MaxFeedSize).dataBytes
.via(scanner)
.via(CsvToMap.toMap(Utf8))
.map(attrs => Product(attrs("id").decodeString(Utf8), attrs.mapValues(_.decodeString(Utf8))))
.via(es.flow)
val futureDone = graph.runWith(Sink.foreach(println))
futureDone onComplete {
case Success(_) => println("Done")
case Failure(e) => println(e)
}