Spark DataFrame java.lang.OutOfMemoryError:长循环运行时超出了GC开销限制

问题描述 投票:1回答:2

我正在运行Spark应用程序(Spark 1.6.3集群),它对2个小数据集进行一些计算,并将结果写入S3 Parquet文件。

这是我的代码:

public void doWork(JavaSparkContext sc, Date writeStartDate, Date writeEndDate, String[] extraArgs) throws Exception {
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
    S3Client s3Client = new S3Client(ConfigTestingUtils.getBasicAWSCredentials());

    boolean clearOutputBeforeSaving = false;
    if (extraArgs != null && extraArgs.length > 0) {
        if (extraArgs[0].equals("clearOutput")) {
            clearOutputBeforeSaving = true;
        } else {
            logger.warn("Unknown param " + extraArgs[0]);
        }
    }

    Date currRunDate = new Date(writeStartDate.getTime());
    while (currRunDate.getTime() < writeEndDate.getTime()) {
        try {

            SparkReader<FirstData> sparkReader = new SparkReader<>(sc);
            JavaRDD<FirstData> data1 = sparkReader.readDataPoints(
                    inputDir,
                    currRunDate,
                    getMinOfEndDateAndNextDay(currRunDate, writeEndDate));
            // Normalize to 1 hours & 0.25 degrees
            JavaRDD<FirstData> distinctData1 = data1.distinct();

            // Floor all (distinct) values to 6 hour windows
            JavaRDD<FirstData> basicData1BySixHours = distinctData1.map(d1 -> new FirstData(
                    d1.getId(),
                    TimeUtils.floorTimePerSixHourWindow(d1.getTimeStamp()),
                    d1.getLatitude(),
                    d1.getLongitude()));

            // Convert Data1 to Dataframes
            DataFrame data1DF = sqlContext.createDataFrame(basicData1BySixHours, FirstData.class);
            data1DF.registerTempTable("data1");

            // Read Data2 DataFrame
            String currDateString = TimeUtils.getSimpleDailyStringFromDate(currRunDate);
            String inputS3Path = basedirInput + "/dt=" + currDateString;
            DataFrame data2DF = sqlContext.read().parquet(inputS3Path);
            data2DF.registerTempTable("data2");

            // Join data1 and data2
            DataFrame mergedDataDF = sqlContext.sql("SELECT D1.Id,D2.beaufort,COUNT(1) AS hours " +
                    "FROM data1 as D1,data2 as D2 " +
                    "WHERE D1.latitude=D2.latitude AND D1.longitude=D2.longitude AND D1.timeStamp=D2.dataTimestamp " +
                    "GROUP BY D1.Id,D1.timeStamp,D1.longitude,D1.latitude,D2.beaufort");

            // Create histogram per ID
            JavaPairRDD<String, Iterable<Row>> mergedDataRows = mergedDataDF.toJavaRDD().groupBy(md -> md.getAs("Id"));
            JavaRDD<MergedHistogram> mergedHistogram = mergedDataRows.map(new MergedHistogramCreator());

            logger.info("Number of data1 results: " + data1DF.select("lId").distinct().count());
            logger.info("Number of coordinates with data: " + data1DF.select("longitude","latitude").distinct().count());
            logger.info("Number of results with beaufort histograms: " + mergedDataDF.select("Id").distinct().count());

            // Save to parquet
            String outputS3Path = basedirOutput + "/dt=" + TimeUtils.getSimpleDailyStringFromDate(currRunDate);
            if (clearOutputBeforeSaving) {
                writeWithCleanup(outputS3Path, mergedHistogram, MergedHistogram.class, sqlContext, s3Client);
            } else {
                write(outputS3Path, mergedHistogram, MergedHistogram.class, sqlContext);
            }
        } finally {
            TimeUtils.progressToNextDay(currRunDate);
        }
    }
}

public void write(String outputS3Path, JavaRDD<MergedHistogram> outputRDD, Class outputClass, SQLContext sqlContext) {
    // Apply a schema to an RDD of JavaBeans and save it as Parquet.
    DataFrame fullDataDF = sqlContext.createDataFrame(outputRDD, outputClass);
    fullDataDF.write().parquet(outputS3Path);
}

public void writeWithCleanup(String outputS3Path, JavaRDD<MergedHistogram> outputRDD, Class outputClass,
                             SQLContext sqlContext, S3Client s3Client) {
    String fileKey = S3Utils.getS3Key(outputS3Path);
    String bucket = S3Utils.getS3Bucket(outputS3Path);

    logger.info("Deleting existing dir: " + outputS3Path);
    s3Client.deleteAll(bucket, fileKey);

    write(outputS3Path, outputRDD, outputClass, sqlContext);
}

public Date getMinOfEndDateAndNextDay(Date startTime, Date proposedEndTime) {
    long endOfDay = startTime.getTime() - startTime.getTime() % MILLIS_PER_DAY + MILLIS_PER_DAY ;
    if (endOfDay < proposedEndTime.getTime()) {
        return new Date(endOfDay);
    }
    return proposedEndTime;
}

data1的大小约为150,000,data2约为500,000。

我的代码所做的基本上是做一些数据操作,合并2个数据对象,做更多的操作,打印一些统计数据并保存到镶木地板。

火花每个服务器有25GB的内存,代码运行正常。每次迭代大约需要2-3分钟。

当我在大量日期上运行时,问题就开始了。

过了一会儿,我得到一个OutOfMemory:

java.lang.OutOfMemoryError: GC overhead limit exceeded
    at scala.collection.immutable.List.$colon$colon$colon(List.scala:127)
    at org.json4s.JsonDSL$JsonListAssoc.$tilde(JsonDSL.scala:98)
    at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:139)
    at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:72)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
    at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:164)
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:42)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:38)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:87)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:72)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:72)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:71)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:70)

上次运行时,它在233次迭代后崩溃。

它崩溃的线是这样的:

logger.info("Number of coordinates with data: " + data1DF.select("longitude","latitude").distinct().count());

任何人都可以告诉我最终崩溃的原因是什么?

java apache-spark garbage-collection out-of-memory spark-dataframe
2个回答
0
投票

当GC占用进程总执行时间的98%以上时,会发生此错误。您可以通过转到http://master:4040中的阶段选项卡来监控Spark Web UI中的GC时间。

尝试在提交spark应用程序时使用spark。{driver / executor} .memory -conf来增加驱动程序/执行程序(以产生此错误为准)内存。

另一件要尝试的是更改java正在使用的垃圾收集器。阅读这篇文章:https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html。它非常清楚地解释了为什么发生GC开销错误以及哪种垃圾收集器最适合您的应用程序。


0
投票

我不确定每个人都会发现这个解决方案可行,但将Spark集群升级到2.2.0似乎已经解决了这个问题。

我已经运行了几天我的应用程序,并且还没有崩溃。

© www.soinside.com 2019 - 2024. All rights reserved.