Apache Spark Aggregator 类的 merge 方法采用两个缓冲区并将它们合并为一个。我可以重用其中一个缓冲区(可能修改它)而不是创建一个要返回的新缓冲区吗?
我在reduce 方法的文档中注意到了这一点:
为了性能,该函数可以修改 b 并返回它而不是 为 b 构造新对象。但是
merge上没有类似的消息。我还在 Spark 代码中发现了这个示例,它返回一个缓冲区而不是创建一个新缓冲区,所以我假设(至少)这是可能的。
该合并的 scaladoc 显示:
Merges an input aggregation object into aggregation buffer object and returns a new buffer object. For performance, the function may do in-place merge and return it instead of constructing new buffer object.
This is typically called when doing PartialMerge or Final mode aggregation.
Params:
buffer – the aggregation buffer object used to store the aggregation result.
input – an input aggregation object. Input aggregation object can be produced by de-serializing the partial aggregate's output from Mapper side.
即第一个参数是当前分区的缓冲区。该合并函数的实际调用者也在 TypedImperativeAggregate 中:
final override def merge(buffer: InternalRow, inputBuffer: InternalRow): Unit = {
val bufferObject = getBufferObject(buffer)
// The inputBuffer stores serialized aggregation buffer object produced by partial aggregate
val inputObject = deserialize(inputBuffer.getBinary(inputAggBufferOffset))
buffer(mutableAggBufferOffset) = merge(bufferObject, inputObject)
}
即从合并返回的缓冲区将被重新用作此分区缓冲区。您可以根据需要重复使用左侧或右侧的对象。