随着阿卡流,我怎么知道什么时候一个源已完成?

问题描述 投票:0回答:2

我有我周围保持请求之间的Alpakka Elasticsearch Sink。当我的请求,我创建一个HTTP请求Source并把它转换成Elasticsearch Sources的WriteMessage,然后运行与mySource.runWith(theElasticseachSink)

  1. 如何当源已完成我得到通知?没有什么有用的似乎是物化。
  2. 将源完成传递到接收器,这意味着我必须建立一个新的每一次?
  3. 如果是上述情况,将与Flow.fromSourceAndSink帮助莫名其妙脱钩呢?

我的目标是要知道什么时候该HTTP下载完成(包括它穿过vias),并能够重用下沉。

elasticsearch akka akka-stream reactive-streams alpakka
2个回答
0
投票

你可以通过一个流动的单件围绕你愿意,你甚至可以绕过整个executabe图(这些都是immutables)。运行()调用物化流,但不会改变你的图形或其部件。

1)既然你想知道什么时候HttpDownload通过的流量,为什么不使用全图表未来[完成]?假设您的来电elasticsearch是异步的,这应该是平等的,因为你的水槽只是触发呼叫,并不会等待。你也可以使用Source.queue(https://doc.akka.io/docs/akka/2.5/stream/operators/Source/queue.html),只是增加你的消息队列,然后重新使用已定义的图,从而你可以需要proocessing时添加新的消息。这其中也物化一个SourceQueueWithComplete让你停止流。除此之外,只要需要,而无需等待使用它另一个流重用下沉。

2)如上所述:没有,则不需要多次实例化一个水槽。

最好的问候,安迪


0
投票

事实证明,Alpakka的Elasticsearch库还支持流量的形状,所以我可以有我的源经由去,并通过该物化未来的任何下沉运行。 Sink.foreach工作正常这里测试的目的,例如,在https://github.com/danellis/akka-es-test

Flow fromFunction { product: Product =>
    WriteMessage.createUpsertMessage(product.id, product.attributes)
} via ElasticsearchFlow.create[Map[String, String]](index, "_doc")

定义es.flow然后

val graph = response.entity.withSizeLimit(MaxFeedSize).dataBytes
    .via(scanner)
    .via(CsvToMap.toMap(Utf8))
    .map(attrs => Product(attrs("id").decodeString(Utf8), attrs.mapValues(_.decodeString(Utf8))))
    .via(es.flow)

val futureDone = graph.runWith(Sink.foreach(println))

futureDone onComplete {
    case Success(_) => println("Done")
    case Failure(e) => println(e)
}
© www.soinside.com 2019 - 2024. All rights reserved.