Spark Accumulator值不递增[重复]

问题描述 投票:0回答:3

我最近一直在研究Spark数据集,我有一个场景,我必须为每一行生成行号并将其存储在名为“Ids”的列中。此行号从1,2,3 ...开始,并根据数据集中的行数递增。 (在我的情况下有10000-20000条记录)

考虑一下,我有一个数据集'empDataset',其值为:

name , dept , project
---------------------
Tina, Finance , abc
Leena, Finance , abc
Joe, Marketing , xyz

现在对于上面的数据集,我想添加一个列'Ids',其值从1,2,3开始增加等等。

预期的产出就是这个

name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 3

我还想将此输出存储在另一个数据集中,并进一步用于不同的转换。

需要帮助来解决这个问题陈述。!!

我的代码片段:

LongAccumulator  accValue = spark.sparkContext().longAccumulator();
long rowNumber = 1;

spark.udf().register("randomNumberGenerator", new UDF1<String, Long>() {

            @Override
            public Long call(String namCol) throws Exception {
                    accum.add(rowNumber);
                    System.out.println("inside" + accum.value());
                    return accum.value();
                }
        }, DataTypes.LongType);

Dataset<Row> empDatasetWithIds= empDataset.withColumn("Ids",callUDF("randomNumberGenerator",
                col(name)));

Dataset<Row> filterDept = empDatasetWithIds.filter(...here filtering with dept...)

我得到的输出是empDatasetWithIds(输出不正确):

name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 1

上面的代码在本地模式下运行时工作正常,但在集群模式下,值不会增加。

我还通过以下链接:https://community.hortonworks.com/questions/36888/spark-java-accumulator-not-incrementing.html Spark Java Accumulator not incrementing

火花控制器需要采取措施来触发工作。在我的场景中,我进一步对数据集执行过滤器转换,我该如何解决这个问题。需要帮忙。

java apache-spark user-defined-functions apache-spark-dataset accumulator
3个回答
1
投票

累加器是用于在执行程序之间累积数据并将它们发送回驱动程序的变量。如果从执行程序读取其值,则不会定义行为(AFAIK)。我想你可能会得到到目前为止本地分区累积的内容。实际上,spark的目标是进行并行计算。因此,当使用累加器时,在单独的累加器中为每个分区累积数据,然后将这些数据合并并发送回驱动程序(map reduce范例)。因此,您不能使用累加器在执行程序之间共享信息。这不是它的意思

你可以做的是,如果你需要连续索引,可以使用RDD API中的zipWithIndex,如果你只需要增加索引,可以使用SparkSQL API中的monoticallyIncreasingId。前者触发一个小火花作业,而后者几乎是免费的(没有火花作业)。

选项1(增加但不一定是连续的指数)

yourDataframe.withColumn("id", functions.monotonicallyIncreasingId());

备选方案2(连续和增加指数)

StructType schema = yourDataframe.schema();
schema.add(new StructField("id", DataTypes.LongType, false,null));
JavaRDD<Row> rdd = yourDataframe.toJavaRDD().zipWithIndex()
    .map(x -> {
         Collection<Object> row = JavaConverters.asJavaCollection(x._1.toSeq());
         Long index = x._2;
         row.add(index);
         return RowFactory.create(row);
    });
Dataset<Row> indexedData = spark.createDataFrame(rdd, schema);

0
投票

如果顺序提升方面不是问题,您可以按照以下方式执行:

import org.apache.spark.sql.functions.monotonically_increasing_id 
import spark.implicits._

val ds = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0)),5).toDS   // Just a dummy DS

val newds = ds.withColumn("uniqueIdColumn", monotonically_increasing_id())

newds.show(false)

尝试并适应您自己的情况。

顺便说一句:错误使用累加器。


-1
投票

对于此功能,您可以使用row_number

import org.apache.spark.sql.expressions.Window
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.row_number;

Dataset<Row> empDatasetWithIds = empDataset.withColumn("Ids", 
    row_number().over(Window.orderBy(col("name"), col("dept"), col("project)))
)

参考:https://stackoverflow.com/a/31077759

正如在使用没有分区的窗口的注释中所指出的那样效率很低。并且应该避免在生产代码中处理大数据。

你使用累加器的方法不起作用(如Why does worker node not see updates to accumulator on another worker nodes?中所述),因为spark在不同的执行程序中运行此代码(在不同的机器上运行不同的jvm进程),并且如果累加器,每个都有自己的副本。

© www.soinside.com 2019 - 2024. All rights reserved.