Giorgio
Giorgio

Reputation: 1093

Tuning Spark Job

I'm trying to tune the process below, because I'm having a very Java heap space error.

Looking at the Spark UI, there is a cogroup that behaves in a very strange way. Before that stage, everything seems balanced very well( at the moment I have hardcoded number of partitions,48). Inside the method loadParentMPoint there is the cogroup trasformation and basically when I'm going to execute the next count, the cogroup is calculated and basically 48 tasks are scheduled, but 47 of them terminate immediately(seems doensn't have nothing to process), except one that start doing shuffling read, until it fill up heap space and exception is raised.

I have launched few times the process with the same data set and the end is always the same. Everytime It works just one executors., while before is well balanced.

Why I'm having this behavior? Maybe I'm missing anything? I tried to repartition data before cogroup, because I supposed it was unbalanced, but it doesn't works, the same when I tried to use partitionBy.

This is the code excerpt:

    class BillingOrderGeneratorProcess extends SparkApplicationErrorHandler {

    implicit val ctx = sc
    val log = LoggerFactory.getLogger(classOf[BillingOrderGeneratorProcess])
    val ipc = new Handler[ConsumptionComputationBigDataIPC]
    val billingOrderDao = new Handler[BillingOrderDao]
    val mPointDao = new Handler[MeasurementPointDAO]
    val billingOrderBDao = new Handler[BillingOrderBDAO]
    val ccmDiscardBdao = new Handler[CCMDiscardBDAO]
    val ccmService = new Handler[ConsumptionComputationBillingService]
    val registry = new Handler[IncrementalRegistryTableData]
    val podTimeZoneHelper = new Handler[PodDateTimeUtils]
    val billingPodStatusDao = new Handler[BillingPodStatusBDAO]
    val config = new Handler[PropertyManager]
    val paramFacade = new Handler[ConsumptionParameterFacade]
    val consumptionMethods = new Handler[ConsumptionMethods]
    val partitions = config.get.defaultPartitions()
    val appName = sc.appName
    val appId = sc.applicationId
    val now = new DateTime

    val extracted = ctx.accumulator(0l, "Extracted from planning")
    val generated = ctx.accumulator(0l, "Billing orders generated")
    val discarded = ctx.accumulator(0l, "Billing orders discarded")

    // initialize staging
    val staging = new TxStagingTable(config.get().billingOrderGeneratorStagingArea())
    staging.prepareReading

    val rddExtractedFromPlanning = staging
        .read[ExtractedPO]()
        .repartition(48)
        .setName("rddExtractedFromPlanning")
        .cache 

    val rddExtracted = rddExtractedFromPlanning
      .filter { x =>
        extracted += 1
        (x.getExtracted == EExtractedType.EXTRACTED ||
         x.getExtracted == EExtractedType.EXTRACTED_BY_USER ||
         x.getExtracted == EExtractedType.EXTRACTED_BY_TDC)
      }
      .map { x =>
        log.info("1:extracted>{}", x)
        val bo = MapperUtil.mapExtractedPOtoBO(x)
        bo
      }

    val podWithExtractedAndLastBillingOrderPO = rddExtracted.map { e =>
      val billOrdr = CCMIDGenerator.newIdentifier(CCMIDGenerator.Context.GENERATOR, e.getPod, e.getCycle(), e.getExtractionDate())
      val last = billingOrderDao.get.getLastByPodExcludedActual(e.getPod, billOrdr)
      log.info("2:last Billing order>{}", last);
      (e.getPod, e, last)
    }
      .setName("podWithExtractedAndLastBillingOrderPO")
      .cache()

    val podWithExtractedAndLastBillingOrder = podWithExtractedAndLastBillingOrderPO.map(e => (e._1, (e._2, MapperUtil.mapBillingOrderPOtoBO(e._3))))

    val  rddRegistryFactoryKeys = podWithExtractedAndLastBillingOrderPO
      .map(e => (e._1,1))
      .reduceByKey(_+_)
      .keys

    val rddRegistryFactory = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryKeys, List())

    val rddExtractedWithMPoint = ConsumptionComputationUtil
      .groupPodWithMPoint(podWithExtractedAndLastBillingOrder, rddRegistryFactory)
      .filter{ e =>
        val mPoint = e._3
        val condition = mPoint != null
        condition match {
          case false => log.error("MPoint is NULL for POD -> " + e._1)
          case true =>
        }
        condition
      }
      .setName("rddExtractedWithMPoint")
      .cache

    rddExtractedWithMPoint.count

    val rddExtractedWithMPointWithParent = ConsumptionComputationUtil
      .groupWithParent(rddExtractedWithMPoint)
      .map{
        case (pod, extracted, measurementPoint, billOrder, parentMpointId, factory) =>
          if (!parentMpointId.isEmpty) {
            val mPointParent = mPointDao.get.findByMPoint(parentMpointId.get)
            log.info("2.1:parentMpoin>Mpoint=" + parentMpointId + " parent for pod -> " + pod)
            (pod, extracted, measurementPoint, billOrder, mPointParent.getPod, factory)
          } else {
            log.info("2.1:parentMpoin>Mpoint=null parent for pod -> " + pod)
            (pod, extracted, measurementPoint, billOrder, null, factory)
          }
      }
        .setName("rddExtractedWithMPointWithParent")
        .cache()

    rddExtractedWithMPointWithParent.count

    val rddRegistryFactoryParentKeys = rddExtractedWithMPointWithParent
      .filter(e => Option(e._5).isDefined)
      .map(e => (e._5,1))
      .reduceByKey(_+_)
      .keys

    rddRegistryFactoryParentKeys.count

    val rddRegistryFactoryParent = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryParentKeys, List())

    rddRegistryFactoryParent.count

    val imprb = new Handler[IncrementalMeasurementPointRegistryBuilder]

    val rddNew = rddExtractedWithMPointWithParent.map({
      case (pod, extracted, measurementPoint, billingOrder, parentPod, factory) =>
        (parentPod, (pod, extracted, measurementPoint, billingOrder, factory))
    })
    rddNew.count

    val p = rddNew.cogroup(rddRegistryFactoryParent)
    p.count

    val rddExtractedWithMPointWithMpointParent = p.filter{ case (pod, (inputs, mpFactories)) => inputs.nonEmpty }
    .flatMap{ case (pod, (inputs, mpFactories)) =>
        val factory = mpFactories.headOption //eventually one or none factory
        val results = inputs.map{e =>
          val measurementPointTupla = factory.flatMap{f =>
            Option(imprb.get.buildSparkDecorator(new MeasurementPointFactoryAdapter(f)).getMeasurementPointByDate(e._2.getRequestDate), f)
         }
          val tupla = measurementPointTupla.getOrElse(null)
          val toBeBilled = if(tupla!=null && tupla._1!=null) false else true
          val m = if(tupla!=null && tupla._1!=null) tupla._1 else null
          val f = if(tupla!=null && tupla._2!=null) tupla._2 else null
          (e._1, e._2, e._3, e._4, m, toBeBilled, e._5 , f)
        }
      results
    }
    .setName("rddExtractedWithMPointWithMpointParent")
    .cache()

    rddExtractedWithMPointWithMpointParent.foreach({ e =>
      log.info("2.2:parentMpoint>MpointComplete=" + e._5 + " parent for pod -> " + e._1)
    })
}

These are the stages for the two RDDs involved into the cogroup operation, rddNew:

enter image description here

rddRegistryFactory:

enter image description here

and this is the stage of the cogroup:

enter image description here

this is the storage situation:

enter image description here

this is the executors tabs:

enter image description here

N.B. I added count action Just for debugging purpose.

UPDATE:

Upvotes: 3

Views: 203

Answers (2)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29195

  • I strongly believe this Java heap space error is because of cached rdds which seems like not necessary based on your last screen shot that is Storage tab.

enter image description here

Depending on how many times the dataset is accessed and the amount of work involved in doing so, re-computation can be faster than the price paid by the increased memory pressure.

It should go without saying that if you only read a dataset once there is no point in caching it, it will actually make your job slower.

  • For counting for debug purpose you can use countApprox() instead of count. once testing is done you can remove it for real usage of your job

  • most important thing is make sure that your data is uniform across by printing number of records per partition... If needed you can repartition and coalesce.

    can get the number of records per partition like this :

df
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_number","number_of_records")
  .show

Upvotes: 2

Giorgio
Giorgio

Reputation: 1093

I solved it, the problem was related about partitioning. Basically data into the rdd calling cogroup operation had all keys at the same value, so when cogroup happens, Spark tried to hash partitioning both RDDs bringing keys of both rdd on the same executor in order to cogroup them.

Upvotes: 2

Related Questions