我希望有一个带有两个入口的akka流运算符。在一个入口上,它接收有关消息的元数据。在第二个入口上,消息本身。
问题是,虽然一次收到一条消息的元数据,但消息被成批分组。
我希望操作员将所有元数据累积在第一个端口上,这样,当消息批处理进入第二个端口时,该操作员会发出两个批处理(可能是压缩的)。
显而易见的解决方案是只使用Zip
并将元数据分组到上游。但是问题是批次的大小未知。
感谢@LevyRamsey,我做到了。
将2个流的元素合并为元组流。来自左端口的元素将累积,直到右端口上的元素向量可用为止。
ZipAccumulateLeft
具有一个left
和一个right
输入端口以及一个out
端口''当所有输入都有可用的元素时发出]
'''''下游反压时的反压
'''完成'''任何上游操作后完成
'''''''下游取消时取消
https://gist.github.com/gurghet/31cd99a8441ba5fdd380fa7d95bdb628