Spark2.4.0源码——DAGScheduler
发布日期:2022-03-30 18:18:20 浏览次数:51 分类:博客文章

本文共 11774 字,大约阅读时间需要 39 分钟。

  前言

  Spark会将用户提交的作业看作一个job,在提交的时候首先将job转换为一系列的RDD,并按照RDD之间的依赖关系构建DAG(有向无环图),DAGScheduler会按照RDD依赖的不同将DAG划分为不同的stage,每个stage内部按照RDD分区数创建多个task,最后将task封装成taskSets发送到TaskScheduler调度执行。

  RDD依赖

  窄依赖(NarrowDependency):下游RDD与上游RDD的分区是一一对应的关系;

  宽依赖(ShuffleDependency):下游RDD的一个分区对应上游RDD的多个分区。

  Partitioner

  如果RDD间的依赖关系是Shuffle依赖,spark通过分区计算器partitioner来确定上游RDD的分区数据输出到下游RDD的哪个分区。

  spark大部分API默认采用HashPartitioner,将key的hashCode值与分区数进行取模运算后得到该key的分区下标;HashPartitioner的劣势:Java中数组的hashCode是基于数组对象本身的,如果key的数据类型为数组,数组内容一样的key可能会被分到不同的分区。

  RangePartitioner:先从整个RDD中抽取样本并排序取得一个Array[key]类型的数组变量rangeBounds,判断key在rangeBounds中的范围,得到该key在下游RDD中分区的下标。

  当然也可以自定义partitioner,写一个类继承自抽象类Partitioner并重写numPartitions、getPartition和equals方法即可。

  DAGScheduler

  DAGScheduler会将DAG中的RDD按照依赖类型划分为不同的stage,并构建这些stage的依赖关系,让没有依赖的stage并行计算,有依赖的stage串行运算,提升资源使用率和运行效率;stage分为最下游的ResultStage和需要shuffle的ShuffleMapStage。

  1、JobListener & JobWaiter & ActiveJob & DAGSchedulerEventProcessLoop

  特质JobListener用于对作业中每个task执行结果进行监听,JobWaiter实现了JobListener并最终确定作业的成功或失败;ActiveJob表示已经激活的Job;DAGSchedulerEventProcessLoop是DAGScheduler内部的事件循环处理器,spark组件都通过向DAGScheduler投递DAGSchedulerEvent来使用DAGScheduler,DAGSchedulerEventProcessLoop将处理这些Event并调用DAGScheduler的不同方法,它内部有三个方法:doOnReceive()用于调用DAGScheduler的方法处理不同的事件、onError()和onStop()。

  2、提交Job

  spark程序中每触发一次action算子,就会提交一次job,job会被转换成一系列RDD并调用DAGScheduler.runJob方法,内部调用submitJob方法。

def runJob[T, U](      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      callSite: CallSite,      resultHandler: (Int, U) => Unit,      properties: Properties): Unit = {    //生成运行job的启动时间    val start = System.nanoTime    //提交job,submitJob是异步执行的,所以会立刻返回JobWaiter对象    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)    //等待job处理,如果成功打印日志,如果失败打印日志并抛出异常    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)    waiter.completionFuture.value.get match {      case scala.util.Success(_) =>        logInfo("Job %d finished: %s, took %f s".format          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))      case scala.util.Failure(exception) =>        logInfo("Job %d failed: %s, took %f s".format          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.        val callerStackTrace = Thread.currentThread().getStackTrace.tail        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)        throw exception    }  }
View Code

  submitJob方法内部根据job创建jobWaiter,并向DAGSchedulerEventProcessLoop发送jobSubmitted事件。

def submitJob[T, U](      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      callSite: CallSite,      resultHandler: (Int, U) => Unit,      properties: Properties): JobWaiter[U] = {    // Check to make sure we are not launching a task on a partition that does not exist.    //获取当前job的最大分区数并检查,如果有不存在的分区则抛出异常    val maxPartitions = rdd.partitions.length    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>      throw new IllegalArgumentException(        "Attempting to access a non-existent partition: " + p + ". " +          "Total number of partitions: " + maxPartitions)    }    //生成下一个job的ID    val jobId = nextJobId.getAndIncrement()    //如果job的分区数为0,创建一个totalTasks属性为0的JobWaiter并返回,JobWaiter会将jobPromise设置为Success表示job处理成功    if (partitions.size == 0) {      // Return immediately if the job is running 0 tasks      return new JobWaiter[U](this, jobId, 0, resultHandler)    }    //如果job的分区数大于0,创建一个等待执行完成的JobWaiter,向DAGSchedulerEventProcessLoop发送JobSubmited事件    assert(partitions.size > 0)    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)    eventProcessLoop.post(JobSubmitted(      jobId, rdd, func2, partitions.toArray, callSite, waiter,      SerializationUtils.clone(properties)))    //返回JobWaiter对象    waiter  }
View Code

  DAGSchedulerEventProcessLoop接收到JobSubmited事件后会调用DAGScheduler的handleJobSubmited方法,创建ResultStage并提交,实现如下

private[scheduler] def handleJobSubmitted(jobId: Int,      finalRDD: RDD[_],      func: (TaskContext, Iterator[_]) => _,      partitions: Array[Int],      callSite: CallSite,      listener: JobListener,      properties: Properties) {    var finalStage: ResultStage = null    //创建ResultStage,期间可能会发生异常,比如运行在HadoopRDD上的任务依赖的HDFS文件被删除了,异常发生时需要调用JobWaiter的jobFailed方法    try {      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)    } catch {      ...    }        // Job submitted, clear internal data.    barrierJobIdToNumTasksCheckFailures.remove(jobId)    //创建ActiveJob    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)    //清除cacheLocs缓存中RDD的分区位置信息    clearCacheLocs()    logInfo("Got job %s (%s) with %d output partitions".format(      job.jobId, callSite.shortForm, partitions.length))    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")    logInfo("Parents of final stage: " + finalStage.parents)    logInfo("Missing parents: " + getMissingParentStages(finalStage))        //生成job提交的时间    val jobSubmissionTime = clock.getTimeMillis()    //将jobID与刚创建的activeJob之间的对应关系放入jobIdToActiveJob中    jobIdToActiveJob(jobId) = job    //将刚创建的activeJob放入activeJobs集合    activeJobs += job    //让finalStage的_activeJob属性持有刚创建的activeJob    finalStage.setActiveJob(job)    //获取当前job的所有stage对应的stageinfo    val stageIds = jobIdToStageIds(jobId).toArray    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))    //向LiveListenerBus投递sparkListenerJobStart事件,缓存stageId    listenerBus.post(      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))    //提交ResultStage    submitStage(finalStage)  }
View Code

 3、构建Stage

private def createResultStage(      rdd: RDD[_],      func: (TaskContext, Iterator[_]) => _,      partitions: Array[Int],      jobId: Int,      callSite: CallSite): ResultStage = {    //RDD的一些初始检查    checkBarrierStageWithDynamicAllocation(rdd)    checkBarrierStageWithNumSlots(rdd)    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)    //获取所有父stage的列表    val parents = getOrCreateParentStages(rdd, jobId)    //生成resultStage的标识    val id = nextStageId.getAndIncrement()    //创建ResultStage    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)    //将ResultStage与它的ID对应关系添加到stageIdToStage缓存中    stageIdToStage(id) = stage    //更新job的身份标识与ResultStage的映射关系    updateJobIdStageIdMaps(jobId, stage)    stage  }  //获取或创建给定RDD的所有父stage,这些stage将被分配给jobId对应的jobprivate def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {    getShuffleDependencies(rdd).map { shuffleDep =>      getOrCreateShuffleMapStage(shuffleDep, firstJobId)    }.toList  }  //获取给定RDD直接父类的shuffle依赖private[scheduler] def getShuffleDependencies(      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {    val parents = new HashSet[ShuffleDependency[_, _, _]]    val visited = new HashSet[RDD[_]]    val waitingForVisit = new ArrayStack[RDD[_]]    waitingForVisit.push(rdd)    while (waitingForVisit.nonEmpty) {      val toVisit = waitingForVisit.pop()      if (!visited(toVisit)) {        visited += toVisit        toVisit.dependencies.foreach {          case shuffleDep: ShuffleDependency[_, _, _] =>            parents += shuffleDep          case dependency =>            waitingForVisit.push(dependency.rdd)        }      }    }    parents  }  private def getOrCreateShuffleMapStage(      shuffleDep: ShuffleDependency[_, _, _],      firstJobId: Int): ShuffleMapStage = {    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {        //如果已经创建了shullfeDependecy对应的shullfeMapStage,则直接返回shullfeMapStage      case Some(stage) =>        stage        //否则调用getMissingAncestorShuffleDependencies找到所有未创建shuffleMapStage的shullfeDependecy        //并调用createShuffleMapStage创建shullfeMapStage并注册      case None =>        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {            createShuffleMapStage(dep, firstJobId)          }        }        // 最后为当前shullfeDependecy创建shullfeMapStage并注册        createShuffleMapStage(shuffleDep, firstJobId)    }  } private def getMissingAncestorShuffleDependencies(      rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {    val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]    val visited = new HashSet[RDD[_]]    // We are manually maintaining a stack here to prevent StackOverflowError    // caused by recursively visiting    val waitingForVisit = new ArrayStack[RDD[_]]    waitingForVisit.push(rdd)    while (waitingForVisit.nonEmpty) {      val toVisit = waitingForVisit.pop()      if (!visited(toVisit)) {        visited += toVisit        getShuffleDependencies(toVisit).foreach { shuffleDep =>          if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {            ancestors.push(shuffleDep)            waitingForVisit.push(shuffleDep.rdd)          } // Otherwise, the dependency and its ancestors have already been registered.        }      }    }    ancestors  }def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {    //获取shuffleDependecy的rdd属性作为将要创建shuffleMapStage的rdd    val rdd = shuffleDep.rdd    //rdd初始检查    checkBarrierStageWithDynamicAllocation(rdd)    checkBarrierStageWithNumSlots(rdd)    checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)    //获取rdd的分区数组的长度作为要创建的shullfeMapStage的task的数量    val numTasks = rdd.partitions.length    //获取将要创建的shullfeMapStage的所有父stage    val parents = getOrCreateParentStages(rdd, jobId)    //生成将要创建的shullfeMapStage的身份标识    val id = nextStageId.getAndIncrement()    //创建shullfeMapStage    val stage = new ShuffleMapStage(      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)    //更新刚创建的shuffleMapStage的映射关系    stageIdToStage(id) = stage    shuffleIdToMapStage(shuffleDep.shuffleId) = stage    updateJobIdStageIdMaps(jobId, stage)    //mapStatus的检查,因为stage可以重试,所以当前stage可能已经执行过,有部分map任务可能执行成功并将mapStatus更新到    //mapOutputTrackerMaster的缓存,当前stage只要复制这些mapStatus即可,避免重复计算    if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {      // Kind of ugly: need to register RDDs with the cache and map output tracker here      // since we can't do it in the RDD constructor because # of partitions is unknown      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)    }    stage  }
View Code

 4、提交ResultStage

  submitStage方法会通过入参ResultStage逐层获取父stage,再从最上游stage开始逐步调用TaskScheduler.submitTasks方法提交task集合,最后才提交ResultStage的task集合。

private def submitStage(stage: Stage) {    //获取当前stage对应的jobId    val jobId = activeJobForStage(stage)    //判断获取到的jobId是否能找到对应的ActiveJob    if (jobId.isDefined) {      logDebug("submitStage(" + stage + ")")      //如果当前stage还未提交      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {        //获取当前stage的所有未提交的父stage        val missing = getMissingParentStages(stage).sortBy(_.id)        logDebug("missing: " + missing)        //如果没有未提交的父stage,就提交当前stage中所有未提交的task;否则先提交所有未提交的fustage,并将当前stage加入waitingSrages集合        if (missing.isEmpty) {          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")          submitMissingTasks(stage, jobId.get)        } else {          for (parent <- missing) {            submitStage(parent)          }          waitingStages += stage        }      }    } else {  //如果没有找到当前stage对应的ActiveJob,终止依赖于当前stage的所有Job      abortStage(stage, "No active job for stage " + stage.id, None)    }  }
View Code

  

转载地址:https://www.cnblogs.com/cnblogs-syui/p/11115380.html 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Spark2.4.0源码——RpcEnv
下一篇:Spark2.4.0源码——TaskScheduler

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2024年04月03日 17时38分25秒