1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of
* each task, retries tasks if they fail (up to a limited number of times), and
* handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
* to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
* and statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
*
* THREADING: This class is designed to only be called from code with a lock on the
* TaskScheduler (e.g. its event handlers). It should not be called from other threads.
*
* @param sched the TaskSchedulerImpl associated with the TaskSetManager
* @param taskSet the TaskSet to manage scheduling for
* @param maxTaskFailures if any particular task fails this number of times, the entire
* task set will be aborted
*/

TaskSetManager 负责某个 TaskSet 的调度,对该 TaskSet 的所有 task 进行跟踪,如果有失败的 task,会负责重试(重试有上限),并且通过 delay scheduling(可以想想这个怎么实现的?) 实现 locality locality-aware scheduling. 主要的接口有 resourceOffer – 用于判断一个 TaskSet 中的 task 是否需要运行到某个 node 上,statusUpdate – 用于跟踪 task 的状态变化。不是线程安全的。

dequeeTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] 负责从对应的 list 中删除返回一个 pending Task,如果没有合适的 Task 就返回 None,该 function 中会将那些已经运行的 task 进行删除,会跳过所有的不能在对应 execId 上运行的 task(通过 executorIsBlacklisted(execId, index) 进行判断)

1
2
3
4
5
6
7
8
9
10
11
while (indexOffset > 0) {
indexOffset -= 1
val index = list(indexOffset)
if (!executorIsBlacklisted(execId, index)) {
// This should almost always be list.trimEnd(1) to remove tail
list.remove(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return Some(index)
}
}
}

dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)]) 删除并返回一个可执行的 task,只返回符合 locality 约束的 task。首先逐个 locality 进行查找,如果有符合的 task 直接返回,否则返回一个合适的 推测执行的 task

executorIsBlacklisted(execId: String, taskId: Int): Boolean 进行判断某个 execId 上能否运行对应的 task(如果之前这个 taskId 在这个 execId 上运行失败了,而且当前时间和之间失败的时间差小于阈值 EXECUTOR_TASK_BLACKLIST_TIMEOUT

1
2
3
4
5
6
7
8
9
10
def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
if (failedExecutors.contains(taskId)) {
val failed = failedExecutors.get(taskId).get
return failed.contains(execId) &&
clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
}
false
}

dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] 负责删除并返回一个 推测执行的 task,如果没有合适的就返回 None。逻辑就是遍历所有 task,然后看 task 是否能运行在特定的 TaskLocality 上,如果可以就返回,并且将该 task 从推测执行的 task list 中删除。TaskLocality 的顺序为 {PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}

如何实施推测执行,逻辑在 checkSpeculatableTasks 函数中,

  1. 如果该 TaskSetManager 变为 zoombie 了,或者只包含一个 task,就不推测执行(为什么一个 task 就不推测执行)
  2. 如果完成的 task 数大于等于 minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt(其中 SPECULATION_QUANTILE 默认 0.75,可以通过 spark.speculation.quantile 设置)且有 task 成功执行过,则执行下面的步骤
  3. 将所有执行成功的 task 的执行时间进行排序,取第 val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1)) val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100) threshold 作为临界值,对每个 task 进行检测。
  4. 如果该 task 还没有运行成功,运行时间超过 threshold,只有一个实例在跑,而且没有推测执行过,就进行推测执行
  5. 推测执行保证同一个 task 的不同实例不会调度到同一台主机上,且不会调度到以及被加进黑名单的主机中

resourceOffer(execId: String,host: String,maxLocality:TaskLocality.TaskLocality): Option[TaskDescription] 负责资源的实际分配,如果当前 taskSetManager 不是 zoombie 状态才进行处理。首先找出当前时间可以被调度到的最高 Locality,然后使用 dequeuTask 删除并找到一个符合条件的 task,如果找到就更新相关的状态数据(包括,更新现在正在运行的 task 有哪些,更新当前的 locality,然后将 task 所需要的文件等序列化,附加到一个 TaskDescription 结构中并且返回),并且通知 DAGScheduler 该 task 已经开始运行。如果序列化有问题,则直接抛异常。

getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality 获取当前时间对应的一个 TaskLocality, 这里面会有时间等待(等待的时间就是每个 TaskLocality 的等待时间,默认 3s,可以配置)

handleTaskGettingResult 主要进行状态标记,然后通知 DAGScheduler
canFetchMoreResults(size: Long): Boolean 检测是否还能 fetch size 字节大小已经序列化后的数据,如果不能,就将该 taskSetManger 标记为 zoombie,并且通知 DAGScheduler 该 TaskSet 为失败

handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit 负责将一个 task 标记为成功,并且如果当前 TaskSet 所有 task 都运行完成,就标记为 zoombie 状态,并且通知 DAGScheduler。

handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) 将task 标记为失败,并且重新添加到 pendingTask 队列中,然后通知 DAGScheduler。根据失败的信息不同,做不同的处理:

  1. FetchFailure:直接想当前的 tasksetManager 标记为 zoombie,然后做一定的清理工作,就把当前的 tasksetManager 标记为成功
  2. ExceptionFailure:如果是 NotSerializableException 就直接退出,否则会打印相应异常,然后进行重试
  3. 其他的异常,打印日志

executorLost(execId: String, host: String, reason: ExecutorLossReason) 负责处理 executorLost 的情况,由 taskSchedulerImp 调用。逻辑如下

  • 如果是 ShufflleMapTask 且没有开启 externalShuffleServiceEnabled 就进行如下操作:如果 task 以及成功了,就将这些 task 标记为失败,且进行重试(因为后续的 task 需要从这些 task 中获取数据)
  • 如果是其他的,就直接调用 handleFailedTask 进行处理,然后重新计算 locality

getLocalityWait(level: TaskLocality.TaskLocality): Long = 用于获取 locality, 代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
val defaultWait = conf.get("spark.locality.wait", "3s")
val localityWaitKey = level match {
case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
case _ => null
}
if (localityWaitKey != null) {
conf.getTimeAsMs(localityWaitKey, defaultWait)
} else {
0L
}

疑问:

  1. 推测执行的时候,为什么 TaskSet 只有 1 个 task 的话就不推测执行
  2. 推测执行对每个 task 只会进行一次?
    a. 可以被 推测执行多次,只执行一次的逻辑是使用 speculatableTasks 做检测的,当运行一个 推测执行的 task 后,该 task 就会从 speculatableTasks 进行删除,然后就可以进行推测执行了。严格的说法是,只运行同一个 task 的一个实例在“排队等待被推测执行”
  3. 每个 task 的 preferredLocations 怎么得到的?根据什么规则?每一个的含义又是什么,总共有 {PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY} 这些可选项
  4. 每个 TaskSet 的所有 task 都是一样的 locality?
  5. 推测执行的时候,如果最后执行成功多个 task,会对结果有影响吗?怎么规避这种影响的

Comments