这是一个简单的场景。
让我们从单个 Akka 源开始:比方说,从数据库检索的行。基于分区函数,不同的行需要被转移到不同的流中。因此,我们使用扇出运算符,为此我们需要使用 GraphDSL(不能仅使用
.via
和 .to
等简单函数)。
因此,在
GraphDSL.create() { builder => [block]}
的构建器块中,我们创建UniformFanOutShape
的分区并将原始源的出口与分区的入口连接。目前,分区的出口正悬在微风中。
接下来,我们应该将每个单独流中的行格式化为 CSV。 Alpakka 有一个 CSV 模块,我们可以使用
akka.stream.alpakka.csv.scaladsl.CsvFormatting.format([arguments])
Flow 来实现此目的。因此,我们将一些 CSV 格式的流发送到构建器(每个扇形流一个),并将每个分区的出口连接到其相应流的入口。 Flows 的出口在微风中悬挂。接下来,我们应该将文件名附加到每个 CSV 格式的流并将它们压缩在一起。 Alpakka 有一个 Archive 模块,可以从多个源进行压缩。问题是 .add
返回
akka.stream.alpakka.file.scaladsl.Archive.zip()
。它是一个源流(让我们忽略 Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed]
部分,它对应于文件名),为此我们需要一个源中的源来传输数据——或者至少是一个源集合(我们可以将其传递给 ) AchiveMetadata
)。到目前为止,我们一直在构建的图表有一系列悬挂在微风中的插座。但我们只能从该构建块生成一个图,对吧?我们可以将其包装为没有入口和 N 个出口的 Source.apply
图。问题是如何将其转换为源集合,每个上述 CSV 格式出口都有一个源集合。
或者还有另一种方式吗?请推荐。附注查看Akka源代码,我们有:
AmorphousShape
我想要的是这样的:
/**
* Creates a new [[Graph]] by passing a [[GraphDSL.Builder]] to the given create function.
*/
def create[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] => S): Graph[S, NotUsed] = {
val builder = new GraphDSL.Builder
val s = buildBlock(builder)
createGraph(s, builder)
}
不幸的是,
def createMultiple[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] => Seq[S]): Graph[S, NotUsed] = {
val builder = new GraphDSL.Builder
val shapes = buildBlock(builder)
shapes.map(createGraph(_, builder))
}
对于包来说是私有的。采用 Shape 的
new GraphDSL.Builder
也是私有的。哎呀。另一个问题是:如果我们绕过 private 关键字,并且有多个扇出流(只有一个扇出流一切正常),那么对于构造的集合中的每个 createGraph
,并非所有分区插座似乎已连接(因为其他插座是通过其他来源连接的),从而导致错误。我怀疑遍历是原因。
附加到图中的松散出口:每个
Sink.asPublisher
将具体化为 Reactive Streams 发布者(相当于 Akka Sink.publisher
的 Reactive Streams)。然后可以使用 Source
将这些发布商转换为 Source
。使用Source.fromPublisher
重载需要一系列图来注入将具体化为注入图的物化值序列。由于该图的每个出口都被接收器覆盖,因此它本身有一个
GraphDSL.create
。将您的源附加到该接收器并SinkShape
以获取发布者序列。从那里开始,这只是
run()
的问题。谢谢,利维。我实际上做了类似的事情,将渠道从图表中删除——尽管不是作为出版商。
Source(publishers).map(Source.fromPublisher).via(Archive.zip())
我所做的是使用
private
为 Clojure 通道创建自定义源和接收器。有了这些接收器,我原来问题中描述的图表就完全封闭了。使用这些自定义源,我构建了一个更简单的图表,通过 GraphStage
(或者在 Clojure 中为
Archive.zip
)汇集所有内容。 Clojure 通道充当两个互不相连的宇宙之间的虫洞(咳咳,Akka 图)。
(Archive/zip)
当然,存在这些 Clojure 虫洞如何处理背压的问题。但是当缓冲区不足时,Clojure 通道将阻塞接收器,这会对上游的所有内容产生反压。并不是说我期望对远程数据存储库的 JDBC 或 ElasticSearch 调用会比在内存中压缩更快。