用于处理JVM上的流数据的Akka实现
MergeLatest的官方文档指出:MergeLatest会为从某个输入流发出的每个元素发出列表,但仅在每个输入流发出至少一个元素之后。我的...
有人可以举个例子,通过Akka Streaming读取HDFS位置上可用的文本文件吗?我是Akka Stream的新手,在搜寻了那么多内容之后,无法找到相同的任何解决方案。 ...
我目前正在开发一个必须处理源自压缩文件的元素流的应用程序。此应用程序的限制应该是,只有X数量的元素可以在...
akka-http net :: err_incomplete_chunked_encoding 200(确定)
我正在尝试使用Akka-http:将大/经常缓慢的响应发送到浏览器,以呈现为excel文件,即:在ui代码$ http({方法中:“ post”,url:“ myUrl”,数据:“大量...
我定义了一个简单的图形,该图形将恒定流(通过Source.Single(5000)定义)与非恒定源(例如Source(1至100))组合在一起。我的图通过...
[Alpakka S3从存储桶下载文件,保存到文件,并且文件名可用于流程的下一部分
我正在尝试构建使用S3密钥的代码,然后从S3下载这些文件,然后将该数据保存为磁盘上具有密钥名的文件(对于流程的进一步处理是必需的),并作为...
我们有一个Scala应用程序,该程序可从文本文件中读取行并使用Akka Stream处理它们。为了获得更好的性能,我们将并行度设置为5。问题是,如果多行包含相同的...
无法在Akka Stream中使用GraphStage类运行SourceShape
我正在尝试使用GraphStage构造创建Redis Akka流源。这个想法是,每当我从subscription方法获得更新时,我都会将其推送到下一个组件。另外,如果存在...
Akka流2.6。如何创建AactorMaterializer?
从2.6开始,我在此行收到弃用警告:import akka.stream.ActorMaterializer隐式val actorMaterializer = ActorMaterializer()警告:对象ActorMaterializer中应用的方法是...
假设有一些文件流要处理,并且满足条件时仅应处理(消耗)特定文件。即,仅当流包含名为 “ aaa”的文件时,才处理...
产生消息时在alpakka-kafka连接器中使用的传递是什么?
这是在文档https://doc.akka.io/docs/alpakka-kafka/current/producer.html val单中提供的向Kafka生成单个消息的代码:ProducerMessage.Envelope [KeyType,ValueType,。 ..
此问题的解决方案是否有一个不变的替代方案,它可以在流中批量处理数据:val records = Source(List(Record(Record(1,“ a”),Record(1,“ k”),Record(1) ,“ k”),...
我创建了一个源,流和接收器。源是一个整数源。该流将整数传输到字符串,并且接收器将其写入文件,但是我不知道如何将所有这些连接在一起...
无法使用Alpakka使用来自Kafka Consumer的消息
我正在尝试使用alpakka吸收来自kafka的消息。我没有从akka actor那里得到消费者已停止的任何错误,但是它没有使用任何消息。下面是我的代码val ...
path(“ foo”〜Slash。?){complete(HttpEntity(ContentTypes.`application / octet-stream`,ByteString(Files.readAllBytes(Paths.get(“ foo”)))))}说foo是一个1GB文件。我添加了watchTermination ...
Akka Stream Java-是否可以将未知数量的源合并为一个
我有一个Slick源,可产生多个源:Source myBigSource = Slick.source(slickSession,sqlQueryString,(SlickRow row)-> {return(Source)createNewSource(row,...
我有一个Source [Animal],其中Animal是Cat和Dog的2种类型。来源类似于dog1,dog2,dog3,cat1,dog4,dog5,cat2,cat3,dog6,dog7,dog8,dog9,dog10,dog11,dog12,cat4 ...我是...
进行单元测试时,模拟涉及源,流和接收器的Akka Streams调用的最佳方法是什么?例如,takeWhile函数:def takeWhile(p:Out => Boolean):Repr [Out] ...
学习Akka流。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长将它们分成时间组进行处理...
使用Akka Stream和Kafka偏移提交将事件从Kafka流到Couchbase
[我正在尝试使用Alpakka设计Akka流,以读取来自kafka主题的事件,并将其放入Couchbase。到目前为止,我有以下代码,它似乎可以以某种方式工作:消费者....