用于处理JVM上的流数据的Akka实现
当MyActor收到开始消息时,它将运行Akka流,并将接收到的每个项目发布到Akka事件流。 MyActor类(隐式系统:ActorSystem,实现:Materializer,ec:...
[在探索Akka流时,我还遇到了Apache Flink流处理引擎。 Akka流实现反应流并支持背压。因此,如果我必须做出决定...
我正在Scala中使用Alpakka-kafka来消费一个Kafka主题。这是我的代码:val kafkaConsumerSettings:ConsumerSettings [String,String] = ConsumerSettings(actorSystem,new ...
Akka流:使用OverflowStrategy.fail()模拟失败的流
我是Akka和Akka流的新手。我创建了一个虚拟流,并希望它以异常结束,因为我的map()函数非常慢,并且我将缓冲区设置为1。所以我的问题分为两部分:为什么...
我需要调用上游服务(Azure Blob服务)以将数据推送到OutputStream,然后需要通过akka转过来并将其推送回客户端。没有akka(只有servlet代码)...
我对学习Scala,Akka Streams和Akka HTTP并不陌生,因此如果问题太简单,请事先道歉。我想在HTTP请求中执行HTTP请求,就像在下面的...
我正在使用akka / scala / play堆栈。通常,即时消息使用流来执行某些任务。例如,我有一个流,它每分钟都会醒来,从数据库中拾取某些东西,然后调用另一个...
我有一个包装器类AwsS3Bucket,当调用该类时,将返回源Source [ByteString,NotUsed]。在我的单元测试用例中,我模拟了此客户端并执行了必要的断言。 val source = ...
akka流中的Combine运算符具有以下签名:def Combine [T,U](第一个:Source [T,_],第二个:Source [T,_],rest:Source [T,_] *)(策略:Int => Graph [...
Alpakka Consumer不使用通过Docker编写的Kafka发送的消息
我已经通过Docker compose运行了Kafka和Zookeeper。我可以使用Kafka终端向主题发送/使用消息,并且可以通过Conduktor监视所有内容。但不幸的是,我不是...
我如何从Akka Streams接收器引发的异常中恢复?简单示例:Source integerSource = Source.from(Arrays.asList(1、2、3、4、5、6、7、8、9)); ...
im试图使用elasticmq-rest-sqs运行sqs服务器:“ org.elasticmq” %%“ elasticmq-rest-sqs”%“ 0.14.7”,我的akka 依赖项是:val akka = Seq(“ com.lightbend .akka“ %%” akka-stream -...
我当前正在运行类似于以下内容的Akka流设置:┌─────────────┐┐───────────┐│┌ ──────────────┐││REST端点│──▶│队列...
akka.streams。您可以发出值的源(类似于monix.BehaviorSubject)
我正在寻找akka.stream.scaladsl.Source构造方法,该方法可让我简单地从不同代码位置发出下一个值(例如,监视系统事件)。我需要类似于...
akka.UnsupportedAkkaVersion:Akka的当前版本是[2.5.14],但是akka-http需要版本[2.5.26]
这里是课程:import akka.Done;导入akka.NotUsed;导入akka.actor.ActorSystem;导入akka.http.javadsl.ConnectHttp;导入akka.http.javadsl.Http;导入akka.http.javadsl.model ....
在scala Play框架中,在操作结果中流式传输ByteArrayOutputStream
我正在尝试在scala播放框架中将svg图像转换为jpeg图像。我用蜡染,它工作正常。现在,我想在操作结果中流式传输输出,而不是转换...
是否可以从Reactor Flux或RxJava2 Observable / Flowable到AkkaStreams源使用适配器?
有时有一些库,例如R2DBC,它返回一个Reactive Stream,即Reactor Flux,但是在Http层中,我们需要另一个Reactive Stream,即AkkaStreams Source,它是AkkaHttp ...
当我引用类型化的actor系统时,如何为AkkaStreams实例化实现器?
下面的代码无法编译,它表示ActorMaterializer缺少隐式的ActorRefFactory。我应该如何提供一个? val监护人:Behavior [Done] = Behaviors.setup(_ => {Behaviors ....
我遇到了我不理解的编译错误:[错误] /Users/nicu/mooc-reactive/akkahttp-typed-actors-cluster-ORMap-followers/src/main/scala/followers/HttpServer.scala :43:76:类型...