Spark2.4.0源码——TaskScheduler
发布日期:2022-03-30 18:18:20 浏览次数:51 分类:博客文章

本文共 9583 字,大约阅读时间需要 31 分钟。

  概述

  TaskScheduler定义了对任务进行调度的接口规范,目前spark只有taskSchedulerImpl一个实现类,用于接收DAGScheduler发送的taskSets,并按照资源调度算法将资源分配给task并提交task到executor上执行。

  TaskSchedulerImpl通过taskSetManager来实现任务的推测执行和task本地性分配,任务推测执行就是当发现有运行较慢的task时,将该task发送到其他executor上执行,采用最先完成的执行结果,减少运行较慢的task对整个任务进度的影响;task本地性算法可以将task发送到与该task将要处理的数据所在节点最近的executor,减少网络数据传输,spark目前支持5种本地性级别:PROCESS_LOCAL(本地进程)、NODE_LOCAL(本地节点)、NO_PREF(没有偏好)、RACK_LOCAL(本地机架)、ANY(任何)。

  TaskSchedulerImpl还依赖一个后端接口SchedulerBackend,给task分配资源实际上是由这个后端接口完成的。

  TaskSchedulerImpl的初始化和启动

  spark程序创建的sparkContext内部会创建TaskSchedulerImpl并调用TaskSchedulerImpl的initialize和start方法。

  taskSchedulerImpl的属性之一,在创建taskSchedulerImpl的时候会创建根调度池,taskSchedulerImpl对task的调度依赖调度池Pool,需要被调度的task都会被放入调度池中,调度池pool根据调度算法(FIFO/FAIR)对taskSet调度,并将被调度的taskSet交给taskSchedulerImpl进行资源调度。

val rootPool: Pool = new Pool("", schedulingMode, 0, 0)

   initialize和start方法

def initialize(backend: SchedulerBackend) {    //将sparkContext内创建的SchedulerBackend赋给backend属性    this.backend = backend    //根据不同的schedulingMode创建不同的调度池构建器    schedulableBuilder = {      schedulingMode match {        case SchedulingMode.FIFO =>          new FIFOSchedulableBuilder(rootPool)        case SchedulingMode.FAIR =>          new FairSchedulableBuilder(rootPool, conf)        case _ =>          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +            s"$schedulingMode")      }    }    //构建调度池    schedulableBuilder.buildPools()  }  override def start() {    //启动schedulerBackend    backend.start()    //如果应用不是在local模式且开启了推测执行,设置一个执行间隔为SPECULATION_INTERVAL_MS(100ms)的检查可推测执行任务的定时器    if (!isLocal && conf.getBoolean("spark.speculation", false)) {      logInfo("Starting speculative execution thread")      speculationScheduler.scheduleWithFixedDelay(new Runnable {        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {          //检查可推测执行任务          checkSpeculatableTasks()        }      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)    }  }
View Code

  Task提交

  DAGScheduler在调用submitMissingTasks提交taskSet时,内部调用taskScheduler.submitTask方法,实现如下:

override def submitTasks(taskSet: TaskSet) {    //获取task    val tasks = taskSet.tasks    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")    this.synchronized {      val manager = createTaskSetManager(taskSet, maxTaskFailures)      val stage = taskSet.stageId      ////对当前TaskSet进行冲突检查,taskSetsByStageIdAndAttempt中不该有同属于当前stage但TaskSet却不同的情况      val stageTaskSets =        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])      stageTaskSets(taskSet.stageAttemptId) = manager      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>        ts.taskSet != taskSet && !ts.isZombie      }      if (conflictingTaskSet) {        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +          s" ${            stageTaskSets.toSeq.map {              _._2.taskSet.id            }.mkString(",")          }")      }            //将刚创建的taskSetManager添加到调度池构建器创建的调度池中      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)            //如果程序不是local模式且还未接收到task,就设置一个定时器按照STARVATION_TIMEOUT_MS指定的间隔检查taskSchedulerImpl的饥饿状况      if (!isLocal && !hasReceivedTask) {        starvationTimer.scheduleAtFixedRate(new TimerTask() {          override def run() {            if (!hasLaunchedTask) {              logWarning("Initial job has not accepted any resources; " +                "check your cluster UI to ensure that workers are registered " +                "and have sufficient resources")            } else {              this.cancel()            }          }        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)      }      //表示taskSchedulerImpl已经接收到task      hasReceivedTask = true    }    //给task分配资源并运行task    backend.reviveOffers()  }
View Code

  reviveOffers()方法是SchedulerBackend用于给task分配资源并运行task,这个后端接口并不由taskSchedulerImpl创建,而是在sparkContext创建时根据提交模式创建的SchedulerBackend的不同实现类传递给TaskSchedulerImpl的。

  以local模式为例,localSchedulerBackend的reviveoffers方法实际上会向LocalEndpoint发送reviveOffers事件,LocalEndpoint再调用自己的reviveOffers方法,内部再调用TaskSchedulerImpl的resourceOffer方法给task分配资源,最后调用Executor.launchTask加载并尝试执行task,实现如下:

def reviveOffers() {    //创建包含一个workerOffer样例类的序列,workerOffer的localExecutorId为driver、localExecutorHostname为localhost、cores为1    val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores,      Some(rpcEnv.address.hostPort)))    //调用taskSchedulerImpl.resourceOffers给task分配资源    for (task <- scheduler.resourceOffers(offers).flatten) {      //将空闲的CPU内核数-1      freeCores -= scheduler.CPUS_PER_TASK      //调用executor.launchTask方法加载task并执行      executor.launchTask(executorBackend, task)    }  }}
View Code

  资源分配

  TaskSchedulerImpl拿到包含WorkerOffers样例类的序列后会进行预处理,如更新host与executor、机架的映射关系用于task数据本地性的计算,workeroffers的随机shuffle保证任务均匀分配在worker节点,统计worker可用资源等,随后调用自己的ResourceOfferSingleTaskSet方法给TaskSet提供资源;

  ResourceOfferSingleTaskSet方法会获取WorkerOffers内的信息,如executor的身份标识、workerOffer的host,再对workerOffer的CPU检查,如果可用CPU大于task所需的CPU数,则执行以下操作:

  1、调用TaskSetManager的resourceOffer方法给待处理的task按照最大本地性创建TaskDescription

  2、将TaskDescription添加到tasks数组

  3、更新task的身份标识与taskSet、Executor的映射关系缓存

  4、CPU重计算,WorkerOffer的可用CPU减去task所用的CPU数,最后返回launchedTask,即是否给taskSet中的task分配了资源

  TaskSetManager.resourceOffer方法实现:

def resourceOffer(      execId: String,      host: String,      maxLocality: TaskLocality.TaskLocality)    : Option[TaskDescription] =  {    //获取黑名单    val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>      blacklist.isNodeBlacklistedForTaskSet(host) ||        blacklist.isExecutorBlacklistedForTaskSet(execId)    }    //如果taskSetManager不是僵尸状态且要分配task的host和executor不是黑名单,执行以下操作    if (!isZombie && !offerBlacklisted) {      //获取执行时间和最大本地性      val curTime = clock.getTimeMillis()      var allowedLocality = maxLocality      //计算允许的最大本地性级别,如果最大本地性级别是NO_PREF,则允许的最大本地性级别为NO_PREF      //否则最大本地性级别为最大本地性级别maxLocality和getAllowedLocalityLevel获取的本地级别中的最小值      if (maxLocality != TaskLocality.NO_PREF) {        allowedLocality = getAllowedLocalityLevel(curTime)        if (allowedLocality > maxLocality) {          // We're not allowed to search for farther-away tasks          allowedLocality = maxLocality        }      }      //调用dequeueTask方法根据指定的host,executor和本地性级别,找出要执行的task的索引、相应的本地性和是否推断执行,返回三元组      dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>        // Found a task; do some bookkeeping and return a task description        //根据要执行的task的索引找到task        val task = tasks(index)        //为task生成新的身份标识        val taskId = sched.newTaskId()        // Do various bookkeeping        //增加复制运行数        copiesRunning(index) += 1        //获取任务尝试号        val attemptNum = taskAttempts(index).size        //创建task尝试信息        val info = new TaskInfo(taskId, index, attemptNum, curTime,          execId, host, taskLocality, speculative)        //更新task与task尝试信息的映射关系        taskInfos(taskId) = info        taskAttempts(index) = info :: taskAttempts(index)        // Update our locality level for delay scheduling        // NO_PREF will not affect the variables related to delay scheduling        //如果最大本地性级别不是NO_PREF        if (maxLocality != TaskLocality.NO_PREF) {          //获取任务的本地性级别并将最后运行时间设置为当前系统时间          currentLocalityIndex = getLocalityIndex(taskLocality)          lastLaunchTime = curTime        }        // Serialize and return the task        //序列化task        val serializedTask: ByteBuffer = try {          ser.serialize(task)        } catch {          // If the task cannot be serialized, then there's no point to re-attempt the task,          // as it will always fail. So just abort the whole task-set.          case NonFatal(e) =>            val msg = s"Failed to serialize task $taskId, not attempting to retry it."            logError(msg, e)            abort(s"$msg Exception during serialization: $e")            throw new TaskNotSerializableException(e)        }                //task大小检查        if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&          !emittedTaskSizeWarning) {          emittedTaskSizeWarning = true          logWarning(s"Stage ${task.stageId} contains a task of very large size " +            s"(${serializedTask.limit() / 1024} KB). The maximum recommended task size is " +            s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")        }                //向runningTaskSet中添加task的身份标识,并增加调度池中记录的当前运行中任务的数量        addRunningTask(taskId)        // We used to log the time it takes to serialize the task, but task size is already        // a good proxy to task serialization time.        // val timeTaken = clock.getTime() - startTime        //生成task名称        val taskName = s"task ${info.id} in stage ${taskSet.id}"        logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +          s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")          /**          * def taskStarted(task: Task[_], taskInfo: TaskInfo) {          *     eventProcessLoop.post(BeginEvent(task, taskInfo))          * }          * 向DAGSchedulerEventProcessLoop投递BeginEvent事件          */        sched.dagScheduler.taskStarted(task, info)        //创建并返回taskDescription对象        new TaskDescription(          taskId,          attemptNum,          execId,          taskName,          index,          task.partitionId,          addedFiles,          addedJars,          task.localProperties,          serializedTask)      }    } else {      None    }  }
View Code
 

转载地址:https://www.cnblogs.com/cnblogs-syui/p/11133475.html 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Spark2.4.0源码——DAGScheduler
下一篇:Python 设计模式--策略模式

发表评论

最新留言

网站不错 人气很旺了 加油
[***.192.178.218]2024年04月20日 01时49分50秒