用于处理JVM上的流数据的Akka实现
我有一个简单的流声明如下:Source.tick(Duration.ofSeconds(1),Duration.ofSeconds(30),Files.list(rootDir).collect(Collectors.toList()))。mapConcat(files - >。 ..
Source.fromIterator(...)提供了迭代器线程安全性
Iterator传递给Source.fromIterator应该是线程安全的吗?官方文档链接会很好。
我想将文件传递给流,但在某个中间阶段,我想通过将其转换为行流来处理它。我有这样的东西:Stream.flow(file1,file2,file3).via(...
任何人都可以解释我地图和mapAsync w.r.t AKKA流之间的区别吗?在文档中,据说流转换和涉及外部非流的副作用...
我对akka世界有点新意,所以我的知识领域有点小。我正在创建一个https服务器并使用akka流和http处理它,对于特定的URL,我需要将文件发送回...
是否有一种以特殊方式处理Akka流的Source第一元素的惯用方法?我现在拥有的是:var firstHandled = false source.map {elem => if(!firstHandled){...
我有这个代码:def makeFlow()(隐式超时:超时):流[Any,Any,NotUsed] = {val ref:ActorRef = startActor()Flow [Any] .mapAsync(42)(ref.ask)}这个会泄漏演员。每一个......
Akka Streams - 用于Source.unfoldAsync的Backpressure
我目前正在尝试读取分页的HTTP资源。每个页面都是一个多部分文档,如果页面包含更多内容,页面的响应将包含标题中的下一个链接。一个......
如何优化akka-stream中REST API调用的吞吐量
我目前正在学习akka流,我正在尝试实现一个简单的流,它从源获取项目并为每个项目调用REST api。我的代码的简化版本看起来像......
如何在akka-Http中返回JSON而不是Case Class
我创建了一个路由来流式传输JSON中的案例类列表。但是如果我使用ByteString,则打印case类而不是JSON def streamRoute:Route = pathEndOrSingleSlash {val byteString = ...
Akka stream actor-conflation-ratelimit-actor丢弃了前几条消息(有时)
一个简单的混合组合(下面)有时会在staartup上打印一条调试消息,说它因为零需求而丢弃消息。我希望混淆阶段能够提供无限的需求,所以......
下面的代码在运行15分钟内(EC配置xms 1024 xmx2G)在EC2实例上抛出OOO,但在intellij上运行时不会抛出任何错误。 SqsSource(queueUrl,// parallelism = maxBufferSize / ...
我正在制定一项要求,我们需要从Kafka读取消息并将其保存(汇总)到Hive。我可以考虑使用不同技术的多种实现:Akka stream - 来源......
为什么使用Sink.asPublisher创建的Publisher在BroadcastHub使用时不起作用?
我们有一个多组件应用程序,它在组件之间提供Reactive Streams API。一些组件使用Akka Streams实现,其他组件使用例如Akka Streams。反应堆。在一个组件中......
在https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Flow.html上查看Flow类的定义,它具有以下签名:final class Flow [-In,+ Out ,+ Mat]问题......
我正在尝试编写一段代码来消耗代码流(公司的股票交易所代码),并从每个代码的REST API中获取公司信息。我想要取......
我刚刚开始使用akka流,所以这可能是一个微不足道的问题,但我还没有找到答案。我有一个类型为[String,Something]的输入流,我需要摆脱...
如何将alpakka kafka与akka stream websocket结合起来
我使用https://doc.akka.io/docs/alpakka-kafka/current/consumer.html来使用来自kafka的数据如下:隐式val系统:ActorSystem = ActorSystem(“SAPEVENTBUS”)隐式val ...
我正在尝试创建materialize值,当我创建一个源如下所示:case类Info(value:String)val source:Source [String,Future [Info]] = Source.single(“Start”)但它确实...