akka-stream 相关问题

用于处理JVM上的流数据的Akka实现

如何创建流程?

我刚刚开始学习akka流并想创建一个流程。我会说,当我尝试使用它时,我确实理解流的概念但是失败了。从我的例子:最后的类Foo(值:...

回答 1 投票 1

保持。卡夫卡流的例子

我想了解akka流中的Keep.both但我在互联网上找不到一个简单的东西。有人可以提供一个关于Keep.right和Keep.both的非常简单的例子。我试过了: ...

回答 1 投票 0

如何将收到的消息广播到两个不同的流

如何将收到的消息广播到两个不同的流我正在使用akka流websocket客户端来请求和接收数据websocket服务器。通过websocket收到的数据,我......

回答 1 投票 1

在akka流中的ActorSystem关闭

我有一个akka流,它连续消耗来自kafka主题的数据。我从不关闭actor系统,我不希望我的应用程序关闭它是正确的方法吗?什么是正确的方法......

回答 1 投票 0

如何在FUTURE内发送ACK?

我使用一个actor作为Sink如下:import akka.Done import akka.actor。{Actor,ActorLogging,Props} import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.Http import ...

回答 2 投票 0

如何检测空Akka源

在某些边缘情况下,我返回一个空的Source。调用者有没有办法在运行之前检查返回的Source是否为空?

回答 2 投票 0

如果网络服务器仍然有效,怎么弄清楚

我通过websocket客户端将传入的消息从Kafka转发到Web服务器。以下代码显示了我是如何做到的:import akka.Done import akka.actor.ActorSystem import akka.http.scaladsl ....

回答 1 投票 0

如何保持连接打开websocket服务器?

我有以下代码,不保持连接打开websocket服务器:import akka.Done import akka.actor。{Actor,ActorLogging,Props} import akka.http.scaladsl.model.StatusCodes ...

回答 1 投票 0

复合流是否会循环?

我试图理解,下面的代码片段是如何工作的:val flow:Flow [Message,Message,Future [Done]] = Flow.fromSinkAndSourceMat(printSink,helloSource)(Keep.left)两个家伙给了...

回答 1 投票 1

如何使用scala读取tcp流

我有一个java jar,可以在特定端口上生成tcp流。我运行java -jar runner.jar之类的java命令,这开始在端口8888上生成消息流。当我这样做...

回答 1 投票 0

如何在akka流中抛出异常?

我想抛出一个异常,如下所示:Source.empty .map {throw new RuntimeException(“Failed”)}。runWith(Sink.foreach(println))。onComplete {...

回答 1 投票 1

如何使用flatMapConcat?

我试图使用flatMapConcat如下:Source.empty .flatMapConcat {Source.fromFuture(Future(“hello”))}。runWith(Sink.foreach(println))。onComplete {...

回答 1 投票 0

定义自定义分区阶段的问题(无法拉入端口两次)

所以我有一个这个小的自定义舞台用于在Akka Streams中进行分区。 object CustomPartitioner {/ ** *创建一个分区阶段,给定类型A,决定是否......

回答 1 投票 0

Google PubSub可能存在编码问题

从Alpakka PubSub库运行订阅源时,我收到了可能的编码数据。 @Singleton类Consumer @Inject()(config:Configuration,credentialsService:google.creds ....

回答 1 投票 0

将来自不同主题的消息保存到Alpakka中的不同文件

我正在试图弄清楚如何将订阅多个主题的Kafka消费者的消息传递到基于主题的处理阶段(例如将它们保存到特定文件或数据库或其他任何内容)。 ...

回答 1 投票 0

改变Akka流的未来

我正在尝试使用Akka Streams同时向服务器发送请求,然后尝试将每个请求与原始上下文(在此示例中为Int)相关联。这是我把它放在一起的流程:......

回答 1 投票 0

如何使用某些物化值创建接收器

我可以像这样构造简单的接收器:Flow [Int] .to(Sink.ignore)但是,这个接收器的类型为Sink [Int,NotUsed],而我需要一些物化值类型。我该如何创建,例如沉[Int,String]?

回答 1 投票 0

阿卡:有没有一个从未拉过的水槽?

需要一个永不拉动的接收器,用于单元测试。有一个已经可用或我需要自己编码吗?请注意,Sink.ignore()无济于事,因为它总是拉动。我需要一个水槽......

回答 2 投票 1

如何捕获java.net.ConnectException:连接在akka流上被拒绝了?

我有一个kafka消费者,看起来如下:import akka.actor.ActorSystem import akka.kafka.scaladsl.Consumer import akka.kafka。{ConsumerSettings,Subscriptions} import akka.stream ....

回答 3 投票 1

Keep组合的含义是什么?

我正在尝试在akka流中的Keep组合下创建以下示例:import java.nio.file.Paths import akka.NotUsed import akka.actor.ActorSystem import akka.stream。{...

回答 1 投票 3

© www.soinside.com 2019 - 2024. All rights reserved.