我正在尝试调整下面的过程,因为我有一个非常Java heap space error.
看看Spark UI,有一个cogroup
,表现得非常奇怪。在那个阶段之前,一切看起来都非常平衡(目前我有硬编码的分区数,48)。在方法loadParentMPoint
内部存在cogroup trasformation,基本上当我要执行下一个计数时,计算cogroup并且基本上安排了48个任务,但是其中47个任务立即终止(似乎没有任何东西可以处理),除了一个开始进行洗牌读取,直到它填满堆空间并引发异常。
我已使用相同的数据集启动了几次该过程,并且结尾总是相同的。每次它只运行一个执行器,而以前是很平衡的。
为什么我有这种行为?也许我错过了什么?我尝试在cogroup之前使用repartition
数据,因为我认为它是不平衡的,但它不起作用,当我尝试使用partitionBy
时也一样。
这是代码摘录:
class BillingOrderGeneratorProcess extends SparkApplicationErrorHandler {
implicit val ctx = sc
val log = LoggerFactory.getLogger(classOf[BillingOrderGeneratorProcess])
val ipc = new Handler[ConsumptionComputationBigDataIPC]
val billingOrderDao = new Handler[BillingOrderDao]
val mPointDao = new Handler[MeasurementPointDAO]
val billingOrderBDao = new Handler[BillingOrderBDAO]
val ccmDiscardBdao = new Handler[CCMDiscardBDAO]
val ccmService = new Handler[ConsumptionComputationBillingService]
val registry = new Handler[IncrementalRegistryTableData]
val podTimeZoneHelper = new Handler[PodDateTimeUtils]
val billingPodStatusDao = new Handler[BillingPodStatusBDAO]
val config = new Handler[PropertyManager]
val paramFacade = new Handler[ConsumptionParameterFacade]
val consumptionMethods = new Handler[ConsumptionMethods]
val partitions = config.get.defaultPartitions()
val appName = sc.appName
val appId = sc.applicationId
val now = new DateTime
val extracted = ctx.accumulator(0l, "Extracted from planning")
val generated = ctx.accumulator(0l, "Billing orders generated")
val discarded = ctx.accumulator(0l, "Billing orders discarded")
// initialize staging
val staging = new TxStagingTable(config.get().billingOrderGeneratorStagingArea())
staging.prepareReading
val rddExtractedFromPlanning = staging
.read[ExtractedPO]()
.repartition(48)
.setName("rddExtractedFromPlanning")
.cache
val rddExtracted = rddExtractedFromPlanning
.filter { x =>
extracted += 1
(x.getExtracted == EExtractedType.EXTRACTED ||
x.getExtracted == EExtractedType.EXTRACTED_BY_USER ||
x.getExtracted == EExtractedType.EXTRACTED_BY_TDC)
}
.map { x =>
log.info("1:extracted>{}", x)
val bo = MapperUtil.mapExtractedPOtoBO(x)
bo
}
val podWithExtractedAndLastBillingOrderPO = rddExtracted.map { e =>
val billOrdr = CCMIDGenerator.newIdentifier(CCMIDGenerator.Context.GENERATOR, e.getPod, e.getCycle(), e.getExtractionDate())
val last = billingOrderDao.get.getLastByPodExcludedActual(e.getPod, billOrdr)
log.info("2:last Billing order>{}", last);
(e.getPod, e, last)
}
.setName("podWithExtractedAndLastBillingOrderPO")
.cache()
val podWithExtractedAndLastBillingOrder = podWithExtractedAndLastBillingOrderPO.map(e => (e._1, (e._2, MapperUtil.mapBillingOrderPOtoBO(e._3))))
val rddRegistryFactoryKeys = podWithExtractedAndLastBillingOrderPO
.map(e => (e._1,1))
.reduceByKey(_+_)
.keys
val rddRegistryFactory = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryKeys, List())
val rddExtractedWithMPoint = ConsumptionComputationUtil
.groupPodWithMPoint(podWithExtractedAndLastBillingOrder, rddRegistryFactory)
.filter{ e =>
val mPoint = e._3
val condition = mPoint != null
condition match {
case false => log.error("MPoint is NULL for POD -> " + e._1)
case true =>
}
condition
}
.setName("rddExtractedWithMPoint")
.cache
rddExtractedWithMPoint.count
val rddExtractedWithMPointWithParent = ConsumptionComputationUtil
.groupWithParent(rddExtractedWithMPoint)
.map{
case (pod, extracted, measurementPoint, billOrder, parentMpointId, factory) =>
if (!parentMpointId.isEmpty) {
val mPointParent = mPointDao.get.findByMPoint(parentMpointId.get)
log.info("2.1:parentMpoin>Mpoint=" + parentMpointId + " parent for pod -> " + pod)
(pod, extracted, measurementPoint, billOrder, mPointParent.getPod, factory)
} else {
log.info("2.1:parentMpoin>Mpoint=null parent for pod -> " + pod)
(pod, extracted, measurementPoint, billOrder, null, factory)
}
}
.setName("rddExtractedWithMPointWithParent")
.cache()
rddExtractedWithMPointWithParent.count
val rddRegistryFactoryParentKeys = rddExtractedWithMPointWithParent
.filter(e => Option(e._5).isDefined)
.map(e => (e._5,1))
.reduceByKey(_+_)
.keys
rddRegistryFactoryParentKeys.count
val rddRegistryFactoryParent = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryParentKeys, List())
rddRegistryFactoryParent.count
val imprb = new Handler[IncrementalMeasurementPointRegistryBuilder]
val rddNew = rddExtractedWithMPointWithParent.map({
case (pod, extracted, measurementPoint, billingOrder, parentPod, factory) =>
(parentPod, (pod, extracted, measurementPoint, billingOrder, factory))
})
rddNew.count
val p = rddNew.cogroup(rddRegistryFactoryParent)
p.count
val rddExtractedWithMPointWithMpointParent = p.filter{ case (pod, (inputs, mpFactories)) => inputs.nonEmpty }
.flatMap{ case (pod, (inputs, mpFactories)) =>
val factory = mpFactories.headOption //eventually one or none factory
val results = inputs.map{e =>
val measurementPointTupla = factory.flatMap{f =>
Option(imprb.get.buildSparkDecorator(new MeasurementPointFactoryAdapter(f)).getMeasurementPointByDate(e._2.getRequestDate), f)
}
val tupla = measurementPointTupla.getOrElse(null)
val toBeBilled = if(tupla!=null && tupla._1!=null) false else true
val m = if(tupla!=null && tupla._1!=null) tupla._1 else null
val f = if(tupla!=null && tupla._2!=null) tupla._2 else null
(e._1, e._2, e._3, e._4, m, toBeBilled, e._5 , f)
}
results
}
.setName("rddExtractedWithMPointWithMpointParent")
.cache()
rddExtractedWithMPointWithMpointParent.foreach({ e =>
log.info("2.2:parentMpoint>MpointComplete=" + e._5 + " parent for pod -> " + e._1)
})
}
这些是cogroup操作中涉及的两个RDD的阶段,rddNew:
rddRegistryFactory:
这是团队的舞台:
这是存储情况:
这是执行程序选项卡:
注:我添加了count动作仅用于调试目的。
更新:
Java heap space error
是因为缓存的rdds,根据你的最后一个屏幕截图是存储选项卡似乎没有必要。根据访问数据集的次数以及执行此操作所涉及的工作量,重新计算可能比增加的内存压力所支付的价格更快。
不言而喻,如果你只读一个数据集,一旦没有缓存它,它实际上会使你的工作变慢。
countApprox()
而不是count
。一旦完成测试,您可以删除它以实际使用您的工作df .rdd .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))} .toDF("partition_number","number_of_records") .show
我解决了,问题与partitioning
有关。基本上,调用cogroup
操作的rdd中的数据具有相同值的所有键,因此当cogroup发生时,Spark尝试对两个RDD进行散列分区,将两个rdd的键带到同一个执行器上以便对它们进行组合。