akka-stream 相关问题

用于处理JVM上的流数据的Akka实现

如何根据某些条件处理Akka流?

假设有一些文件流要处理,并且满足条件时仅应处理(消耗)特定文件。即,仅当流包含名为 “ aaa”的文件时,才处理...

回答 1 投票 0

产生消息时在alpakka-kafka连接器中使用的传递是什么?

这是在文档https://doc.akka.io/docs/alpakka-kafka/current/producer.html val单中提供的向Kafka生成单个消息的代码:ProducerMessage.Envelope [KeyType,ValueType,。 ..

回答 1 投票 0

不可改变的流批量处理

此问题的解决方案是否有一个不变的替代方案,它可以在流中批量处理数据:val records = Source(List(Record(Record(1,“ a”),Record(1,“ k”),Record(1) ,“ k”),...

回答 1 投票 2

Akka如何连接源,流和接收器

我创建了一个源,流和接收器。源是一个整数源。该流将整数传输到字符串,并且接收器将其写入文件,但是我不知道如何将所有这些连接在一起...

回答 1 投票 0

无法使用Alpakka使用来自Kafka Consumer的消息

我正在尝试使用alpakka吸收来自kafka的消息。我没有从akka actor那里得到消费者已停止的任何错误,但是它没有使用任何消息。下面是我的代码val ...

回答 1 投票 0

如何计算akka-http的完整响应时间?

path(“ foo”〜Slash。?){complete(HttpEntity(ContentTypes.`application / octet-stream`,ByteString(Files.readAllBytes(Paths.get(“ foo”)))))}说foo是一个1GB文件。我添加了watchTermination ...

回答 1 投票 0

Akka Stream Java-是否可以将未知数量的源合并为一个

我有一个Slick源,可产生多个源:Source myBigSource = Slick.source(slickSession,sqlQueryString,(SlickRow row)-> {return(Source)createNewSource(row,...

回答 1 投票 0

Akka流中的分批/取景

我有一个Source [Animal],其中Animal是Cat和Dog的2种类型。来源类似于dog1,dog2,dog3,cat1,dog4,dog5,cat2,cat3,dog6,dog7,dog8,dog9,dog10,dog11,dog12,cat4 ...我是...

回答 1 投票 0

嘲笑Akka流的Mockito

进行单元测试时,模拟涉及源,流和接收器的Akka Streams调用的最佳方法是什么?例如,takeWhile函数:def takeWhile(p:Out => Boolean):Repr [Out] ...

回答 1 投票 1

Akka流批量处理

学习Akka流。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长将它们分成时间组进行处理...

回答 1 投票 0

使用Akka Stream和Kafka偏移提交将事件从Kafka流到Couchbase

[我正在尝试使用Alpakka设计Akka流,以读取来自kafka主题的事件,并将其放入Couchbase。到目前为止,我有以下代码,它似乎可以以某种方式工作:消费者....

回答 1 投票 0

如何使用Akka流读取mongodb

我有一个mongodb,它每天都会进行数据更新和插入,我想实时获取与更新和插入相关的数据以推送到另一个数据库中,是否可以使用akka来完成它...

回答 1 投票 0

使用流testkit测试Akka流divertTo

我有一个由多个流组成的图,每个流都返回某个错误或实际结果中的一个。该代码使用divertTo以便将所有Lefts发送到与happy -...

回答 1 投票 0

是否可以使用Sink.actorRefWithAck发送批量消息?

我正在使用Akka Streams,并且遇到了Sink.actorRefWithAck。我了解它会发送一条消息,并且仅在对上一个.... >>>

回答 1 投票 0

尝试使用Akka流封送jaxb对象时为空文件

我正在尝试将jaxb对象编组为xml字符串,然后使用akka Streams将其写入文件,但是在执行创建的文件后为空。我已经检查了封送处理方法,但是没有...

回答 1 投票 0

Akka合并水槽而无法访问流程

我正在使用一个接受单个AKKA接收器并用数据填充它的API:def fillSink(sink:Sink [String,_])是否有一种方法,无需深入研究akka的深度,可以处理两个输出...

回答 1 投票 1

随着阿卡流,我怎么知道什么时候一个源已完成?

我有我周围保持请求之间的Alpakka Elasticsearch下沉。当我的请求,我创建一个HTTP请求的来源和把它转换成Elasticsearch WriteMessages的源代码,...

回答 2 投票 0

推动元件外部到在fs2的反应性流

我有一个外部(即,我不能改变它)的Java API它是这样的:公共接口发件人{无效发送(事件E); }我需要实现它接受每个事件发件人,转换...

回答 2 投票 4

watchTermination没有在阿卡-HTTP流量触发

目前我正在试图建立一个阿卡-HTTP WebSocket连接,可以:广播到所有连接的客户端回答客户的某些请求这是我创造我的流量至今:// ...保持

回答 1 投票 0

阿卡HTTP流反序列化JSON

是否有可能动态地反序列化外部,未知长度的,从阿卡HTTP字节字符串流分成域对象?方面,我称之为输出JSON一个无限长的HTTP终结...

回答 3 投票 8

© www.soinside.com 2019 - 2024. All rights reserved.