Spark Aggregators 的 merge 方法中可以重用其中一个缓冲区吗?

问题描述 投票:0回答:1

Apache Spark Aggregator 类的 merge 方法采用两个缓冲区并将它们合并为一个。我可以重用其中一个缓冲区(可能修改它)而不是创建一个要返回的新缓冲区吗?

我在

reduce 方法的文档中注意到了这一点:

为了性能,该函数可以修改 b 并返回它而不是 为 b 构造新对象。

但是

merge上没有类似的消息。我还在 Spark 代码中发现了这个示例,它返回一个缓冲区而不是创建一个新缓冲区,所以我假设(至少)这是可能的。

apache-spark apache-spark-sql aggregate-functions user-defined-functions
1个回答
0
投票
merge 由实现 TypedImperativeAggregate 的 ComplexTypeAggregateExpression 或调用 toColumn 时包装在 TypedImperativeAggregate 中的 udaf.ScalaAggregator 调用。

该合并的 scaladoc 显示:

Merges an input aggregation object into aggregation buffer object and returns a new buffer object. For performance, the function may do in-place merge and return it instead of constructing new buffer object. This is typically called when doing PartialMerge or Final mode aggregation. Params: buffer – the aggregation buffer object used to store the aggregation result. input – an input aggregation object. Input aggregation object can be produced by de-serializing the partial aggregate's output from Mapper side.
即第一个参数是当前分区的缓冲区。

该合并函数的实际调用者也在 TypedImperativeAggregate 中:

final override def merge(buffer: InternalRow, inputBuffer: InternalRow): Unit = { val bufferObject = getBufferObject(buffer) // The inputBuffer stores serialized aggregation buffer object produced by partial aggregate val inputObject = deserialize(inputBuffer.getBinary(inputAggBufferOffset)) buffer(mutableAggBufferOffset) = merge(bufferObject, inputObject) }
即从合并返回的缓冲区将被重新用作此分区缓冲区。

您可以根据需要重复使用左侧或右侧的对象。

© www.soinside.com 2019 - 2024. All rights reserved.