akka-stream 相关问题

用于处理JVM上的流数据的Akka实现

物化值如何在Akka Stream中工作

我了解到,当我运行Akka流图时,它将实现最正确的组件。但是这样做:Source.range(1,100).to(Sink.reduce((a,b)-> a + b))。run(materializer);将...

回答 1 投票 0

Akka:Java中的Flow等效于什么

scala中的Akka流提供了Flow概念。 Java中的等效功能是什么?例如,在Scala中,存在Flow.take(n),但在Java中,存在Source.take(n),它返回的源不是流。

回答 1 投票 0

在Akka中是否可以像RxJava / Reactor一样从Source => Source函数创建Flow?

我仍然没有找到在AkkaStreams中进行“过滤”流程的简便方法。使用fromFunction对我来说很容易执行“映射”流程,但是对我却不行。在RxJava / Reactor中有一个...

回答 1 投票 1

[使用akka-stream-alpakka从s3下载pdf文件,并将其存储为字节数组

我正在尝试使用akka-stream-alpakka连接器从S3下载pdf文件。我有s3路径,并尝试在alpakka s3Client上使用包装方法下载pdf。 def getSource(s3Path:...

回答 1 投票 0

[使用akka-stream-alpakka从s3下载pdf文件

我正在尝试使用akka-stream-alpakka连接器从S3下载pdf文件。我有s3路径,并尝试在alpakka s3Client上使用包装方法下载pdf。 def getSource(s3Path:...

回答 2 投票 0

Akka Alpakka SqsSource足够奇怪,可以与queueUrl和queueName一起使用

我正在使用Akka Streams,还使用alpakka.sqs.scaladsl从sqs队列中读取消息。我已经做过很多次了,但是现在我上传了一个版本,该版本将队列名称而不是队列URL放置在源中...

回答 1 投票 0

基于第一个输入元素选择一个BidiFlow

我有一个协议,其类型为协议:Flow [ByteString,ByteString,NotUsed]。进入流的元素是用户发送的消息,而离开流的元素是来自...

回答 1 投票 0

您如何处理一系列Akka流源?

我们有一个可以处理事件的接收器:def parseEvent():Sink [T,Future [akka.Done]] = {Sink.foreach [T] {event => {//处理事件}}}使用单个源即可正常工作:...

回答 1 投票 1

将序列输入Akka Stream的Combine()

您如何使用Akka Stream的Combine方法合并一系列Source?例如,val source = Seq [Source [T,_]]。似乎没有方法签名需要Seq / Iterable / etc ......>

回答 1 投票 0

您如何处理一系列AKKA源?

我们有一个可以处理事件的接收器:def parseEvent():Sink [T,Future [akka.Done]] = {Sink.foreach [T] {event => {//处理事件}}}使用单个源即可正常工作:...

回答 1 投票 0

如何对我的流程中的上游完成反应?

假设存在如下定义的Akka流:def tee = {var writer:Writer = ??? Flow.fromFunction [String,String] {msg => writer.write(msg)msg}}当上游...

回答 1 投票 0

如何将元素添加到流中

我有一个流程,其中连接了两个S3Sink。在第一个s3Sink中,我希望文件没有标题,在第二个s3Sink(s3SinkHeaders)中,我希望文件具有标题。 val ...

回答 1 投票 0

Akka流延迟和背压

在使用Akka Streams时,我面临着缓冲和背压问题。我有以下代码(为简化问题而进行了简化):对象X扩展了App {...

回答 1 投票 0

Akka:创建图形阶段而不进行扇动

我正在使用GraphStage构建稍微复杂的流,并希望从一个简单的线性图开始逐步进行操作,该线性图不包含任何广播,分区或合并。我...

回答 1 投票 0

akka流是否提供有保证的交付

以下是来自文档(akka):交付保证流裁判员在其运动中使用正常的演员消息传递,因此提供了相同级别的基本交付保证。...

回答 1 投票 0

Akka流2.6。如何创建ActorMaterializer?

从2.6开始,我在此行收到弃用警告:import akka.stream.ActorMaterializer隐式val actorMaterializer = ActorMaterializer()警告:对象ActorMaterializer中应用的方法是...

回答 1 投票 2

通过akka流链接上下文

我正在将一些C#代码转换为scala和akka流。我的C#代码看起来像这样:Task GetPartialResult1Async(Request request)... Task ... ...> ] >>] >> 免责声明,我对C#的async / await并不完全熟悉。 [根据我从C#文档的快速浏览中获得的信息,Task<T>是严格(即渴望,而不是懒惰)评估的计算,如果成功,最终将包含T。 Scala的等效项是Future[T],其中C#代码的等效项是: import scala.concurrent.{ ExecutionContext, Future } def getPartialResult1Async(req: Request): Future[Result1] = ??? def getPartialResult2Async(req: Request): Future[Result2] = ??? def getResultAsync(req: Request)(implicit ectx: ExecutionContext): Future[Result] = { val result1 = getPartialResult1Async(req) val result2 = getPartialResult2Async(req) result1.zipWith(result2) { tup => val (r1, r2) = tup new Result(req, r1, r2) } /* Could also: * for { * r1 <- result1 * r2 <- result2 * } yield { new Result(req, r1, r2) } * * Note that both the `result1.zipWith(result2)` and the above `for` * construction may compute the two partial results simultaneously. If you * want to ensure that the second partial result is computed after the first * partial result is successfully computed: * for { * r1 <- getPartialResult1Async(req) * r2 <- getPartialResult2Async(req) * } yield new Result(req, r1, r2) */ } 在这种情况下不需要Akka Streams,但是如果您有其他需要使用Akka Streams,则可以表示为 val actorSystem = ??? // In Akka Streams 2.6, you'd probably have this as an implicit val val parallelism = ??? // Controls requests in flight val flow = Flow[Request] .mapAsync(parallelism) { req => import actorSystem.dispatcher getPartialResult1Async(req).map { r1 => (req, r1) } } .mapAsync(parallelism) { tup => import actorSystem.dispatcher getPartialResult2Async(tup._2).map { r2 => new Result(tup._1, tup._2, r2) } } /* Given the `getResultAsync` function in the previous snippet, you could also: * val flow = Flow[Request].mapAsync(parallelism) { req => * getResultAsync(req)(actorSystem.dispatcher) * } */ 基于Future的实现的一个优势是,它很容易与您想要在给定上下文中使用的任何Scala抽象的并发/并行性(例如cats,akka流,akka)进行集成。我对Akka Streams集成的一般直觉是在第二个代码块中的注释中指向三层代码的方向。 我没有在问题中描述的任何根本不同的方式。 但是可以显着改善电流: 阶段1:FlowWithContext 代替使用自定义的WithState monad,可以使用内置的FlowWithContext。 这样做的优点是您可以在流上使用标准运算符,而不必担心转换WithState monad。 Akka会为您解决这个问题。 所以代替 def partialResult1Flow[TState]: Flow[WithState[TState, Request], WithState[TState, Result1]] = Flow[WithState[TState, Request]].mapAsync(_ mapAsync {doRequest(_)}) 我们可以写: def partialResult1Flow[TState]: FlowWithContext[Request, TState, Result1, TState, NotUsed] = FlowWithContext[Request, TState].mapAsync(doRequest(_)) 不幸的是,虽然FlowWithContext在不需要更改上下文时很容易编写,但是当您需要通过需要将一些当前数据移入上下文的流时,使用起来有点麻烦(就像我们的一样)。为此,您需要转换为Flow(使用asFlow),然后使用FlowWithContext转换回asFlowWithContext。 [我发现在这种情况下,将整个内容写为Flow,最后转换为FlowWithContext是最容易的。 例如: def flow[TState]: FlowWithContext[Request, TState, Result, TState, NotUsed] = Flow[(Request, TState)] .map(x => (x._1, (x._1, x._2))) .via(partialResult1Flow) .map(x => (x._2._1, (x._2._1, x._1, x._2._2)) .via(partialResult2Flow) .map(x => (Result(x._2._1, x._2._2, x._1), x._2._2)) .asFlowWithContext((a: Request, b: TState) => (a,b))(_._2) .map(_._1) 这更好吗? 在这种情况下,可能会更糟。在其他情况下,您几乎不需要更改上下文会更好。但是,无论哪种方式,我都建议使用它的内置方式,而不要依赖于自定义的monad。 阶段2:viaUsing 为了使它更易于使用,我为Flow和FlowWithContext创建了viaUsing扩展方法: import akka.stream.{FlowShape, Graph} import akka.stream.scaladsl.{Flow, FlowWithContext} object FlowExtensions { implicit class FlowViaUsingOps[In, Out, Mat](val f: Flow[In, Out, Mat]) extends AnyVal { def viaUsing[Out2, Using, Mat2](func: Out => Using)(flow: Graph[FlowShape[(Using, Out), (Out2, Out)], Mat2]) : Flow[In, (Out2, Out), Mat] = f.map(x => (func(x), x)).via(flow) } implicit class FlowWithContextViaUsingOps[In, CtxIn, Out, CtxOut, Mat](val f: FlowWithContext[In, CtxIn, Out, CtxOut, Mat]) extends AnyVal { def viaUsing[Out2, Using, Mat2](func: Out => Using)(flow: Graph[FlowShape[(Using, (Out, CtxOut)), (Out2, (Out, CtxOut))], Mat2]): FlowWithContext[In, CtxIn, (Out2, Out), CtxOut, Mat] = f .asFlow .map(x => (func(x._1), (x._1, x._2))) .via(flow) .asFlowWithContext((a: In, b: CtxIn) => (a,b))(_._2._2) .map(x => (x._1, x._2._1)) } } viaUsing的目的是从当前输出创建FlowWithContext的输入,同时通过将其通过上下文来保留当前输出。结果为Flow,其输出为嵌套流和原始流的输出的元组。 使用viaUsing,我们的示例简化为: def flow[TState]: FlowWithContext[Request, TState, Result, TState, NotUsed] = FlowWithContext[Request, TState] .viaUsing(x => x)(partialResult1Flow) .viaUsing(x => x._2)(partialResult2Flow) .map(x => Result(x._2._2, x._2._1, x._1)) 我认为这是一项重大改进。我已请求将viaUsing添加到Akka,而不是依赖扩展方法here。

回答 2 投票 1

Akka http testkit,如何测试流式响应计时

我正在尝试测试使用Akka HTTP编写的端点。该端点将某物的来源转换为流HTTP响应。我希望能够测试此响应的时间,因为...

回答 1 投票 2

如何在应用程序重新启动期间维护Alpakka / Akka Streams源状态?

我是Alpakka的新手,正在考虑将其用于系统集成。在整个应用程序重新启动时维持Akka Streams源状态的理想方法是什么?例如:让我们...

回答 1 投票 1

通过akka流束缚数据

我正在将一些C#代码转换为scala和akka流。我的C#代码看起来像这样:Task GetPartialResult1Async(Request request)... Task ... ...> ] >>] >> 免责声明,我对C#的async / await并不完全熟悉。 [根据我从C#文档的快速浏览中获得的信息,Task<T>是严格(即渴望,而不是懒惰)评估的计算,如果成功,最终将包含T。 Scala的等效项是Future[T],其中C#代码的等效项是: import scala.concurrent.{ ExecutionContext, Future } def getPartialResult1Async(req: Request): Future[Result1] = ??? def getPartialResult2Async(req: Request): Future[Result2] = ??? def getResultAsync(req: Request)(implicit ectx: ExecutionContext): Future[Result] = { val result1 = getPartialResult1Async(req) val result2 = getPartialResult2Async(req) result1.zipWith(result2) { tup => val (r1, r2) = tup new Result(req, r1, r2) } /* Could also: * for { * r1 <- result1 * r2 <- result2 * } yield { new Result(req, r1, r2) } * * Note that both the `result1.zipWith(result2)` and the above `for` * construction may compute the two partial results simultaneously. If you * want to ensure that the second partial result is computed after the first * partial result is successfully computed: * for { * r1 <- getPartialResult1Async(req) * r2 <- getPartialResult2Async(req) * } yield new Result(req, r1, r2) */ } 在这种情况下不需要Akka Streams,但是如果您有其他需要使用Akka Streams,则可以表示为 val actorSystem = ??? // In Akka Streams 2.6, you'd probably have this as an implicit val val parallelism = ??? // Controls requests in flight val flow = Flow[Request] .mapAsync(parallelism) { req => import actorSystem.dispatcher getPartialResult1Async(req).map { r1 => (req, r1) } } .mapAsync(parallelism) { tup => import actorSystem.dispatcher getPartialResult2Async(tup._2).map { r2 => new Result(tup._1, tup._2, r2) } } /* Given the `getResultAsync` function in the previous snippet, you could also: * val flow = Flow[Request].mapAsync(parallelism) { req => * getResultAsync(req)(actorSystem.dispatcher) * } */ 基于Future的实现的一个优势是,它很容易与您想要在给定上下文中使用的任何Scala抽象的并发/并行性(例如cats,akka流,akka)进行集成。我对Akka Streams集成的一般直觉是在第二个代码块中的注释中指向三层代码的方向。 我没有在问题中描述的任何根本不同的方式。 但是可以显着改善电流: 阶段1:FlowWithContext 代替使用自定义的WithState monad,可以使用内置的FlowWithContext。 这样做的优点是您可以在流上使用标准运算符,而不必担心转换WithState monad。 Akka会为您解决这个问题。 所以代替 def partialResult1Flow[TState]: Flow[WithState[TState, Request], WithState[TState, Result1]] = Flow[WithState[TState, Request]].mapAsync(_ mapAsync {doRequest(_)}) 我们可以写: def partialResult1Flow[TState]: FlowWithContext[Request, TState, Result1, TState, NotUsed] = FlowWithContext[Request, TState].mapAsync(doRequest(_)) 不幸的是,虽然FlowWithContext在不需要更改上下文时很容易编写,但是当您需要通过需要将一些当前数据移入上下文的流时,使用起来有点麻烦(就像我们的一样)。为此,您需要转换为Flow(使用asFlow),然后使用FlowWithContext转换回asFlowWithContext。 [我发现在这种情况下,将整个内容写为Flow,最后转换为FlowWithContext是最容易的。 例如: def flow[TState]: FlowWithContext[Request, TState, Result, TState, NotUsed] = Flow[(Request, TState)] .map(x => (x._1, (x._1, x._2))) .via(partialResult1Flow) .map(x => (x._2._1, (x._2._1, x._1, x._2._2)) .via(partialResult2Flow) .map(x => (Result(x._2._1, x._2._2, x._1), x._2._2)) .asFlowWithContext((a: Request, b: TState) => (a,b))(_._2) .map(_._1) 这更好吗? 在这种情况下,可能会更糟。在其他情况下,您几乎不需要更改上下文会更好。但是,无论哪种方式,我都建议使用它的内置方式,而不要依赖于自定义的monad。 阶段2:viaUsing 为了使它更易于使用,我为Flow和FlowWithContext创建了viaUsing扩展方法: import akka.stream.{FlowShape, Graph} import akka.stream.scaladsl.{Flow, FlowWithContext} object FlowExtensions { implicit class FlowViaUsingOps[In, Out, Mat](val f: Flow[In, Out, Mat]) extends AnyVal { def viaUsing[Out2, Using, Mat2](func: Out => Using)(flow: Graph[FlowShape[(Using, Out), (Out2, Out)], Mat2]) : Flow[In, (Out2, Out), Mat] = f.map(x => (func(x), x)).via(flow) } implicit class FlowWithContextViaUsingOps[In, CtxIn, Out, CtxOut, Mat](val f: FlowWithContext[In, CtxIn, Out, CtxOut, Mat]) extends AnyVal { def viaUsing[Out2, Using, Mat2](func: Out => Using)(flow: Graph[FlowShape[(Using, (Out, CtxOut)), (Out2, (Out, CtxOut))], Mat2]): FlowWithContext[In, CtxIn, (Out2, Out), CtxOut, Mat] = f .asFlow .map(x => (func(x._1), (x._1, x._2))) .via(flow) .asFlowWithContext((a: In, b: CtxIn) => (a,b))(_._2._2) .map(x => (x._1, x._2._1)) } } viaUsing的目的是从当前输出创建FlowWithContext的输入,同时通过将其通过上下文来保留当前输出。结果为Flow,其输出为嵌套流和原始流的输出的元组。 使用viaUsing,我们的示例简化为: def flow[TState]: FlowWithContext[Request, TState, Result, TState, NotUsed] = FlowWithContext[Request, TState] .viaUsing(x => x)(partialResult1Flow) .viaUsing(x => x._2)(partialResult2Flow) .map(x => Result(x._2._2, x._2._1, x._1)) 我认为这是一项重大改进。我已请求将viaUsing添加到Akka,而不是依赖扩展方法here。

回答 2 投票 1

© www.soinside.com 2019 - 2024. All rights reserved.