akka-stream 相关问题

用于处理JVM上的流数据的Akka实现

流式传输由其他流组成的存档

假设我在s3中有很多文件,并且根据用户请求我想要开始流式传输从其他s3文件流构建的存档。是否可以像下面那样实现流量流? ...

回答 1 投票 3

Akka Streams KillSwitch in alpaca jms

我有一个场景,我使用alpakka启动多个jmsSource(针对不同的队列)。我还需要在任何时候分离队列。所以我把KillSwitch添加到了jms akka ......

回答 1 投票 1

Websocket单元测试:来自ScalaTestRouteTest的WS不会执行websocket请求

我正在尝试为websocket进行单元测试。从doc,我应该可以使用WS参见下面的sscce包com.streamingout import akka.http.scaladsl.model.ws.TextMessage import akka ....

回答 1 投票 2

如果节点崩溃,如何在akka actor中恢复排队的消息?

如果节点崩溃并且在那个时间点消息在邮箱中排队,那么这些消息将如何重新处理?如果他们不能被重新处理那么我们怎么能说akka编程模型是......

回答 2 投票 0

线程阻止Akka-streams-kafka中的Kafka协调员

我使用VisualVM来分析使用akka-streams-kafka的应用程序。它显示了很多Kafka协调员阻塞线程谁是这些协调员?我还有三位卡夫卡消费者......

回答 1 投票 0

同一个流中有多个接收器

我有一个这样的流和两个接收器,但一次只使用一个:Source.fromElements(1,2,3).via(flow).runWith(sink1)或Source.fromElements(1,2,3) .via(flow).runWith(sink2)它是......

回答 2 投票 4

通过多节点上的Akka-Streams关于Kafka消费者瞬态消息的问题

我们正在使用Kafka来存储由我们集群中的节点生成的消息,并将其分发到集群中的所有节点,我主要使用它来处理akka-streams但是有几个......

回答 1 投票 0

Akka-http:连接到localhost上的websocket

我试图通过localhost上的websocket连接到某个服务器。当我尝试在JS中使用ws = new WebSocket('ws:// localhost:8137');它成功了。但是,当我使用akka-http和akka -...

回答 2 投票 0

Alpakka - 处理消息时的回调

如何在每条消息成功发送到JMS或失败后进行回调? val jmsSink = JmsSink.textSink(JmsSinkSettings(connectionFactory).withQueue(“My_Queue”))Source(Stream.from(1))...

回答 1 投票 0

Akka http-client无法使用来自服务器的所有数据流

我正在尝试编写一个简单的akka 流休息端点和客户端来使用此流。但后来我尝试运行服务器和客户端,客户端只能消耗部分流。我看不到任何......

回答 1 投票 2

如何在被动kafka 0.8.2.2中重新运行失败的消息

我想重新发布x无法处理的消息。我看到手动提交代码:val consumerWithOffsetSink = kafka.consumeWithOffsetSink(consumerProperties)Source ....

回答 2 投票 2

如何处理组中的groupby子流

我一直在玩akka-streams,似乎遇到了一个问题,我找不到一个干净的方法来处理这个问题。我有来自1 ... *玩家的活动......

回答 1 投票 0

如何使用基于请求参数的Flows在akka-http中引入非阻塞延迟

我正在启动服务器并使用akka stream connection.handleWith(handleRequest())处理它,其中handleRequest():Flow [HttpRequest,HttpRespnse,_]我需要在发送时创建延迟...

回答 2 投票 0

在Redis pub / sub和Akka Streams中使用SSE的最简单方法是什么?

我想为以下场景流式传输分块服务器发送事件:订阅Redis密钥,如果密钥更改,则使用Akka Streams流式传输新值。如果有......它应该只流

回答 2 投票 1

如何从cassandra事件流中获取最后一个事件

我正在阅读一个cassandra事件流,并希望得到最后一个元素。我目前正在这样做:def myData:Future [Long] = readJournal(myPersistenceId)。drop(5).take(1).map(...

回答 1 投票 0

在“Akka-Streams”中使用`extrapolate`的用例是什么?

我只是尝试在akka-streams中进行混淆和推断。由于混淆对我来说很有意义,我没有得到推断的用例。我们为什么要为下游添加更多工作 - 当......

回答 1 投票 1

复合流程的目的是什么(来自Sink和Source)?

我正在尝试从网站上理解复合流(来自Sink和Source),它们表示如下:有人可以提供复合流的使用示例。 ...

回答 2 投票 0

协议切换成功与否

我有以下代码片段,不能编译:import akka.actor.ActorSystem import akka.Done import akka.http.scaladsl.Http import akka.stream.ActorMaterializer import akka.stream ....

回答 1 投票 1

在Consumer API中使用createDrainingControl?

我正在浏览Alpakka中Kafka的Consumer API文档。我遇到了这段代码。根据我的理解,使用msg.committableOffset()提交偏移量。那为什么......

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.