akka-stream 相关问题

用于处理JVM上的流数据的Akka实现

将 Akka GraphDSL 与 Zip 阶段结合使用

考虑以下代码: GraphDSL.create() { 隐式构建器 => 导入 GraphDSL.Implicits._ val in = 源(0 到 10) val fanOut = builder.add(广播[Int](2)) val toString = 构建...

回答 1 投票 0

Akka Streams:如何使用 GraphDSL 构建源中源?

这是一个简单的场景。 让我们从单个 Akka 源开始:比方说,从数据库检索的行。基于分区函数,不同的行需要被转移到不同的...

回答 2 投票 0

在失败的 akka 流上跳过流程

我不想在不丢失发生故障时发送的数据的情况下跳过流程。但我找不到办法做到这一点。这是我用来测试的示例代码。 val 决策者:Supervision.Dec...

回答 1 投票 0

通过合并 akka 流中的多个慢速源来保持排序

我需要合并多个慢速源但保持顺序。 如果代码执行两次,顺序必须相同。 非常简单的解决方案如下: 来源 .来自(分区()...

回答 1 投票 0

我们如何使用akka kafka连接器控制从kafka队列轮询消息

我们有一个用例,我们需要根据参与者的可用性使用 akka kafka 连接器控制来自 kafka 队列的消息轮询,以便处理消息。我们有 2 个消费者 5 个

回答 1 投票 0

akka.http.scaladsl.model.ParsingException:使用 akka http 将大文件上传到 S3 时,多部分实体意外结束

我正在尝试使用 Akka HTTP 和 Alpakka S3 连接器将大文件(目前为 90 MB)上传到 S3。它对于小文件(25 MB)工作正常,但是当我尝试上传大文件(90 MB)时,我得到了

回答 2 投票 0

Akka Streaming - 将块重新分配到 max_permissible_chunk_size Scala

我的代码使用 Akka 流将二进制文件上传到 s3,如下所示: 来源 .via(distributeChunks(MAX_BYTES_PER_CHUNK)) .throttle(maxRequestsPerSecond, 1.秒,

回答 1 投票 0

来自具有阻塞操作的迭代器的 Akka 源代码

Source.fromIterator 上的 Akka 文档 (https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.html) 说: 如果迭代器执行阻塞操作,请确保 r...

回答 1 投票 0

Akka Streams 递归流程调用

我正在尝试使用 Akka Streams 实现分页。目前我有 case class SomeObject(id:Long, next_page:Option[Map[String,String]]) def chainRequests(uri: Uri): Future[Option[(Uri, T)]] =...

回答 2 投票 0

在流中链接 Akka-http-client 请求

我想使用 akka-http-client 作为流来链接 http 请求。链中的每个 http 请求都取决于先前请求的成功/响应,并使用它来构造新请求。如果有...

回答 2 投票 0

处理分页结果的 Akka Streams 流程未完成

我想实现一个流程来处理分页结果(例如,底层服务返回一些结果,但也表明通过发出另一个请求可以获取更多结果,传入例如...

回答 3 投票 0

IllegalStateException - Sink.asPublisher 仅支持一个订阅 - 使用 WebSockets 时

在生产中,我们的应用程序生成了以下堆栈跟踪。这里没有我们自己的代码——没什么可继续的。 了解 akka-stream 库的人可以解释一下: 什么是

回答 1 投票 0

基于URL或本地文件限制流

我有一个 akka http API,用户可以将 S3 URL 发送到服务器。然后,服务器从 AWS 服务器启动流并对源执行后续操作。不过我想验证

回答 1 投票 0

Akka Streams RestartSource.onFailuresWithBackoff 停止条件

如果出现异常,我正在使用 RestartSource.onFailuresWithBackoff 重新启动源,但如果收到某种异常类型,我想停止(取消)重新启动。例如:

回答 2 投票 0

Akka Streams flatMapConcat 在创建新源时停止以前的源

我有一个用例,我有一个提供服务“地址”的源。然后我想将该地址映射到从该服务传输数据的源。现在,万一我会得到......

回答 1 投票 0

如何捕获最大RestartSource后alpakka kafka源流失败

如何捕获最大重启次数后RestartSource的错误? 我想在源失败最大次数后做一些事情。 我可以看到源正在重新启动...

回答 1 投票 0

获取重载方法〜>使用替代方案Akka广播

我正在尝试将传入的 Source[ByteString, Any] 广播到 2 个不同的流,然后扇入(zip)输出。但是我收到错误“重载方法 ~> 使用替代方案”。 瓦尔

回答 1 投票 0

使用 ScalaFX 监控 Akka 流源

我要解决的是以下情况: 给定一个无限运行的 Akka Stream,我希望能够监视流的某些点。我能想到的最好的办法就是把这些乱七八糟的东西送到哪里......

回答 1 投票 0

限制 Akka 流中的源滴答频率并始终对最新消息采取行动

以下是问题的背景: 有一个 Source,它不断地滴答作响,无法保证滴答的频率 我们想要限制源的最大滴答率(例如,我们

回答 1 投票 0

从流中提取 Source[ByteString, Any] 到 2 个接收器中

我正在尝试将传入的 Source[ByteString, Any] 放入 2 个接收器中并尝试在 akka 流图中复制传入流。我想要“is”作为输入流,但不是 t...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.