是否可以将用户定义的聚合函数与 Dataset.observe 结合起来?

问题描述 投票:0回答:0

用户定义的聚合函数可以与 Dataset.observe 一起使用吗?

Spark 的 observe 方法看起来是一种有趣的检测工作的方法,但我想添加比我开箱即用的方法更复杂的聚合(例如,使用草图对列进行采样)。

为此我需要能够使用 UDFA 进行聚合,但到目前为止我还没有找到成功运行的方法。

我已经实现了一个聚合器,但到目前为止,以下两个技巧都失败了:

session.udf().register("myAgg", functions.udaf(new MyAggregator(), Encoders.STRING()));
Column aggCol = functions.expr("myAgg(inputCol)");

Column aggCol = functions.udaf(new MyAggregator(), Encoders.STRING()).apply(ds.col("inputCol"));

然后

ds = ds.observe("foo", aggCol);

当一个任务完成时,由于任务结果中的一些不可序列化的东西,我得到一个异常:

org.apache.spark.SparkException: Job aborted due to stage failure: task 0.0 in stage 4.0 (TID 2) had a not serializable result: org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
Serialization stack:
    - object not serializable (class: org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection, value: <function1>)
    - field (class: org.apache.spark.sql.execution.aggregate.ScalaAggregator, name: inputProjection, type: class org.apache.spark.sql.catalyst.expressions.UnsafeProjection)
    - object (class org.apache.spark.sql.execution.aggregate.ScalaAggregator, myAgg(input[5, string, true]))
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;, size 1)
    - field (class: org.apache.spark.sql.execution.AggregatingAccumulator, name: typedImperatives, type: class [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
    - object (class org.apache.spark.sql.execution.AggregatingAccumulator, AggregatingAccumulator(id: 111, name: Some(Collected metrics), value: [empty row]))
    - writeExternal data
    - externalizable object (class org.apache.spark.scheduler.DirectTaskResult, org.apache.spark.scheduler.DirectTaskResult@7c17b6a8)
apache-spark
© www.soinside.com 2019 - 2024. All rights reserved.