调整Spark工作

问题描述 投票:3回答:2

我正在尝试调整下面的过程,因为我有一个非常Java heap space error.

看看Spark UI,有一个cogroup,表现得非常奇怪。在那个阶段之前,一切看起来都非常平衡(目前我有硬编码的分区数,48)。在方法loadParentMPoint内部存在cogroup trasformation,基本上当我要执行下一个计数时,计算cogroup并且基本上安排了48个任务,但是其中47个任务立即终止(似乎没有任何东西可以处理),除了一个开始进行洗牌读取,直到它填满堆空间并引发异常。

我已使用相同的数据集启动了几次该过程,并且结尾总是相同的。每次它只运行一个执行器,而以前是很平衡的。

为什么我有这种行为?也许我错过了什么?我尝试在cogroup之前使用repartition数据,因为我认为它是不平衡的,但它不起作用,当我尝试使用partitionBy时也一样。

这是代码摘录:

    class BillingOrderGeneratorProcess extends SparkApplicationErrorHandler {

    implicit val ctx = sc
    val log = LoggerFactory.getLogger(classOf[BillingOrderGeneratorProcess])
    val ipc = new Handler[ConsumptionComputationBigDataIPC]
    val billingOrderDao = new Handler[BillingOrderDao]
    val mPointDao = new Handler[MeasurementPointDAO]
    val billingOrderBDao = new Handler[BillingOrderBDAO]
    val ccmDiscardBdao = new Handler[CCMDiscardBDAO]
    val ccmService = new Handler[ConsumptionComputationBillingService]
    val registry = new Handler[IncrementalRegistryTableData]
    val podTimeZoneHelper = new Handler[PodDateTimeUtils]
    val billingPodStatusDao = new Handler[BillingPodStatusBDAO]
    val config = new Handler[PropertyManager]
    val paramFacade = new Handler[ConsumptionParameterFacade]
    val consumptionMethods = new Handler[ConsumptionMethods]
    val partitions = config.get.defaultPartitions()
    val appName = sc.appName
    val appId = sc.applicationId
    val now = new DateTime

    val extracted = ctx.accumulator(0l, "Extracted from planning")
    val generated = ctx.accumulator(0l, "Billing orders generated")
    val discarded = ctx.accumulator(0l, "Billing orders discarded")

    // initialize staging
    val staging = new TxStagingTable(config.get().billingOrderGeneratorStagingArea())
    staging.prepareReading

    val rddExtractedFromPlanning = staging
        .read[ExtractedPO]()
        .repartition(48)
        .setName("rddExtractedFromPlanning")
        .cache 

    val rddExtracted = rddExtractedFromPlanning
      .filter { x =>
        extracted += 1
        (x.getExtracted == EExtractedType.EXTRACTED ||
         x.getExtracted == EExtractedType.EXTRACTED_BY_USER ||
         x.getExtracted == EExtractedType.EXTRACTED_BY_TDC)
      }
      .map { x =>
        log.info("1:extracted>{}", x)
        val bo = MapperUtil.mapExtractedPOtoBO(x)
        bo
      }

    val podWithExtractedAndLastBillingOrderPO = rddExtracted.map { e =>
      val billOrdr = CCMIDGenerator.newIdentifier(CCMIDGenerator.Context.GENERATOR, e.getPod, e.getCycle(), e.getExtractionDate())
      val last = billingOrderDao.get.getLastByPodExcludedActual(e.getPod, billOrdr)
      log.info("2:last Billing order>{}", last);
      (e.getPod, e, last)
    }
      .setName("podWithExtractedAndLastBillingOrderPO")
      .cache()

    val podWithExtractedAndLastBillingOrder = podWithExtractedAndLastBillingOrderPO.map(e => (e._1, (e._2, MapperUtil.mapBillingOrderPOtoBO(e._3))))

    val  rddRegistryFactoryKeys = podWithExtractedAndLastBillingOrderPO
      .map(e => (e._1,1))
      .reduceByKey(_+_)
      .keys

    val rddRegistryFactory = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryKeys, List())

    val rddExtractedWithMPoint = ConsumptionComputationUtil
      .groupPodWithMPoint(podWithExtractedAndLastBillingOrder, rddRegistryFactory)
      .filter{ e =>
        val mPoint = e._3
        val condition = mPoint != null
        condition match {
          case false => log.error("MPoint is NULL for POD -> " + e._1)
          case true =>
        }
        condition
      }
      .setName("rddExtractedWithMPoint")
      .cache

    rddExtractedWithMPoint.count

    val rddExtractedWithMPointWithParent = ConsumptionComputationUtil
      .groupWithParent(rddExtractedWithMPoint)
      .map{
        case (pod, extracted, measurementPoint, billOrder, parentMpointId, factory) =>
          if (!parentMpointId.isEmpty) {
            val mPointParent = mPointDao.get.findByMPoint(parentMpointId.get)
            log.info("2.1:parentMpoin>Mpoint=" + parentMpointId + " parent for pod -> " + pod)
            (pod, extracted, measurementPoint, billOrder, mPointParent.getPod, factory)
          } else {
            log.info("2.1:parentMpoin>Mpoint=null parent for pod -> " + pod)
            (pod, extracted, measurementPoint, billOrder, null, factory)
          }
      }
        .setName("rddExtractedWithMPointWithParent")
        .cache()

    rddExtractedWithMPointWithParent.count

    val rddRegistryFactoryParentKeys = rddExtractedWithMPointWithParent
      .filter(e => Option(e._5).isDefined)
      .map(e => (e._5,1))
      .reduceByKey(_+_)
      .keys

    rddRegistryFactoryParentKeys.count

    val rddRegistryFactoryParent = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryParentKeys, List())

    rddRegistryFactoryParent.count

    val imprb = new Handler[IncrementalMeasurementPointRegistryBuilder]

    val rddNew = rddExtractedWithMPointWithParent.map({
      case (pod, extracted, measurementPoint, billingOrder, parentPod, factory) =>
        (parentPod, (pod, extracted, measurementPoint, billingOrder, factory))
    })
    rddNew.count

    val p = rddNew.cogroup(rddRegistryFactoryParent)
    p.count

    val rddExtractedWithMPointWithMpointParent = p.filter{ case (pod, (inputs, mpFactories)) => inputs.nonEmpty }
    .flatMap{ case (pod, (inputs, mpFactories)) =>
        val factory = mpFactories.headOption //eventually one or none factory
        val results = inputs.map{e =>
          val measurementPointTupla = factory.flatMap{f =>
            Option(imprb.get.buildSparkDecorator(new MeasurementPointFactoryAdapter(f)).getMeasurementPointByDate(e._2.getRequestDate), f)
         }
          val tupla = measurementPointTupla.getOrElse(null)
          val toBeBilled = if(tupla!=null && tupla._1!=null) false else true
          val m = if(tupla!=null && tupla._1!=null) tupla._1 else null
          val f = if(tupla!=null && tupla._2!=null) tupla._2 else null
          (e._1, e._2, e._3, e._4, m, toBeBilled, e._5 , f)
        }
      results
    }
    .setName("rddExtractedWithMPointWithMpointParent")
    .cache()

    rddExtractedWithMPointWithMpointParent.foreach({ e =>
      log.info("2.2:parentMpoint>MpointComplete=" + e._5 + " parent for pod -> " + e._1)
    })
}

这些是cogroup操作中涉及的两个RDD的阶段,rddNew:

enter image description here

rddRegistryFactory:

enter image description here

这是团队的舞台:

enter image description here

这是存储情况:

enter image description here

这是执行程序选项卡:

enter image description here

注:我添加了count动作仅用于调试目的。

更新:

  • 我试图删除缓存并再次启动进程,现在每个执行程序都有大约100M用于存储数据,但行为是相同的:只为一个执行程序发生shuffle读取。
  • 我也尝试在cogroup之前在相同的两个RDD之间进行连接操作,只是为了知道我所遇到的问题是仅与cogroup有关还是扩展到所有广泛的转换以及对于join,行为有完全一样。
performance apache-spark transformation shuffle
2个回答
3
投票
  • 我坚信这个Java heap space error是因为缓存的rdds,根据你的最后一个屏幕截图是存储选项卡似乎没有必要。

enter image description here

根据访问数据集的次数以及执行此操作所涉及的工作量,重新计算可能比增加的内存压力所支付的价格更快。

不言而喻,如果你只读一个数据集,一旦没有缓存它,它实际上会使你的工作变慢。

  • 为计算调试目的,您可以使用countApprox()而不是count。一旦完成测试,您可以删除它以实际使用您的工作
  • 最重要的是通过打印每个分区的记录数量来确保您的数据是统一的......如果需要,您可以重新分区和合并。 可以获得每个分区的记录数,如下所示:
df
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_number","number_of_records")
  .show

2
投票

我解决了,问题与partitioning有关。基本上,调用cogroup操作的rdd中的数据具有相同值的所有键,因此当cogroup发生时,Spark尝试对两个RDD进行散列分区,将两个rdd的键带到同一个执行器上以便对它们进行组合。

© www.soinside.com 2019 - 2024. All rights reserved.