Akka Streams:如何使用 GraphDSL 构建源中源?

问题描述 投票:0回答:2

这是一个简单的场景。

让我们从单个 Akka 源开始:比方说,从数据库检索的行。基于分区函数,不同的行需要被转移到不同的流中。因此,我们使用扇出运算符,为此我们需要使用 GraphDSL(不能仅使用

.via
.to
等简单函数)。

因此,在

GraphDSL.create() { builder => [block]}
的构建器块中,我们创建
UniformFanOutShape
的分区并将原始源的出口与分区的入口连接。目前,分区的出口正悬在微风中。

接下来,我们应该将每个单独流中的行格式化为 CSV。 Alpakka 有一个 CSV 模块,我们可以使用

akka.stream.alpakka.csv.scaladsl.CsvFormatting.format([arguments])
Flow 来实现此目的。因此,我们将一些 CSV 格式的流发送到构建器(每个扇形流一个),并将每个分区的出口连接到其相应流的入口。 Flows 的出口在微风中悬挂。
接下来,我们应该将文件名附加到每个 CSV 格式的流并将它们压缩在一起。 Alpakka 有一个 Archive 模块,可以从多个源进行压缩。问题是 

.add

返回

akka.stream.alpakka.file.scaladsl.Archive.zip()
。它是一个源流(让我们忽略
Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed]
部分,它对应于文件名),为此我们需要一个源中的源来传输数据——或者至少是一个源集合(我们可以将其传递给
AchiveMetadata
)。
到目前为止,我们一直在构建的图表有一系列悬挂在微风中的插座。但我们只能从该构建块生成一个图,对吧?我们可以将其包装为没有入口和 N 个出口的 

Source.apply

图。问题是如何将其转换为源集合,每个上述 CSV 格式出口都有一个源集合。

或者还有另一种方式吗?请推荐。

附注查看Akka源代码,我们有:

AmorphousShape

我想要的是这样的:

/** * Creates a new [[Graph]] by passing a [[GraphDSL.Builder]] to the given create function. */ def create[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] => S): Graph[S, NotUsed] = { val builder = new GraphDSL.Builder val s = buildBlock(builder) createGraph(s, builder) }

不幸的是,
def createMultiple[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] => Seq[S]): Graph[S, NotUsed] = { val builder = new GraphDSL.Builder val shapes = buildBlock(builder) shapes.map(createGraph(_, builder)) }

对于包来说是私有的。采用 Shape 的

new GraphDSL.Builder
也是私有的。哎呀。
另一个问题是:如果我们绕过 private 关键字,并且有多个扇出流(只有一个扇出流一切正常),那么对于构造的集合中的每个 

createGraph

,并非所有分区插座似乎已连接(因为其他插座是通过其他来源连接的),从而导致错误。我怀疑遍历是原因。

    

scala akka akka-stream alpakka
2个回答
1
投票

SourceShape

 附加到图中的松散出口:每个 
Sink.asPublisher 将具体化为 Reactive Streams 发布者(相当于 Akka
Sink.publisher
的 Reactive Streams)。然后可以使用
Source
将这些发布商转换为
Source
使用

Source.fromPublisher

重载需要一系列图来注入将具体化为注入图的物化值序列。由于该图的每个出口都被接收器覆盖,因此它本身有一个

GraphDSL.create
。将您的源附加到该接收器并SinkShape
以获取发布者序列。从那里开始,这只是 
run()
 的问题。

谢谢,利维。我实际上做了类似的事情,将渠道从图表中删除——尽管不是作为出版商。

0
投票
披露:我正在开发的服务实际上是用 Clojure 编写的。因此,我从 Clojure 调用 Scala,本质上是用 Clojure 语法编写 Scala 代码。 (当然,所有隐式都必须拼写出来。但是,嘿,Clojure 编译器很高兴地忽略了 Scala 处理

Source(publishers).map(Source.fromPublisher).via(Archive.zip())

关键字的方式。有时可能很有用。)

我所做的是使用 
private

为 Clojure 通道创建自定义源和接收器。有了这些接收器,我原来问题中描述的图表就完全封闭了。使用这些自定义源,我构建了一个更简单的图表,通过

GraphStage

(或者在 Clojure 中为 
Archive.zip
)汇集所有内容。 Clojure 通道充当两个互不相连的宇宙之间的虫洞(咳咳,Akka 图)。
(Archive/zip)

当然,存在这些 Clojure 虫洞如何处理背压的问题。但是当缓冲区不足时,Clojure 通道将阻塞接收器,这会对上游的所有内容产生反压。并不是说我期望对远程数据存储库的 JDBC 或 ElasticSearch 调用会比在内存中压缩更快。
    

© www.soinside.com 2019 - 2024. All rights reserved.