12.1 Spark源码解析 目录 点击展开目录 一、Spark核心架构与初始化 1.1 SparkContext初始化流程 1.2 运行环境构建 二、RDD设计与实现 2.1 RDD核心抽象 2.2 RDD五大特性 2.3 RDD操作执行 三、任务调度系统 3.1 DAGScheduler调度器 3.2 Stage划分算法 3.3 TaskScheduler任务调度 3.4 Task执行机制 3.5 任务分发与调度流程 3.6 容错与监控机制 3.7 失败重试机制 3.8 RDD血统恢复 四、内存管理系统 4.1 统一内存管理 4.2 算子内存存储 4.3 内存监控与优化 4.4 内存管理系统(高级特性) 4.5 统一内存管理(详细实现) 五、Shuffle机制实现 5.1 Sort Shuffle核心 5.2 UnsafeShuffleWriter 六、存储系统设计 6.1 BlockManager存储 6.2 缓存机制 七、网络通信系统 7.1 网络传输服务 7.2 Block传输机制 八、动态资源分配 8.1 资源分配策略 8.2 动态伸缩算法 九、Spark SQL执行引擎 9.1 Catalyst优化器核心 9.2 代码生成与执行 9.3 列式存储与向量化 9.4 自适应查询执行(AQE) 9.5 窗口函数实现原理(以 Lag 为例) 十、广播变量与累加器 10.1 广播变量实现机制 10.2 累加器源码分析 十一、检查点与容错机制 11.1 检查点机制实现 11.2 失败重试与血统恢复 十二、集群管理器集成 12.1 YARN集成源码 12.2 Kubernetes集成 一、Spark核心架构与初始化 1.1 SparkContext初始化流程 SparkContext初始化流程图 graph TD A[SparkContext构造] --> B[创建SparkConf配置] B --> C[创建SparkEnv运行环境] C --> D[创建StatusTracker状态跟踪器] D --> E[创建TaskScheduler任务调度器] E --> F[创建DAGScheduler DAG调度器] F --> G[启动TaskScheduler] G --> H[设置默认并行度] H --> I[SparkContext初始化完成] C --> C1[创建SerializerManager] C --> C2[创建BlockManager] C --> C3[创建MemoryManager] C --> C4[创建MetricsSystem] E --> E1[根据master创建调度器] E1 --> E2[Standalone模式] E1 --> E3[YARN模式] E1 --> E4[Local模式] style A fill:#e1f5fe style I fill:#e8f5e8 style C fill:#fff3e0 style F fill:#f3e5f5 1.2 运行环境构建 SparkContext初始化源码分析 // SparkContext.scala 核心初始化流程 class SparkContext(config: SparkConf) extends Logging { // 1. 创建SparkEnv - 核心运行环境 private val env: SparkEnv = { SparkEnv.createDriverEnv(conf, isLocal, listenerBus, numCores, mockOutputCommitCoordinator) } // 2. 创建状态跟踪器 private val statusTracker = new SparkStatusTracker(this, sparkUI) // 3. 创建任务调度器 private val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) private val taskScheduler = ts // 4. 创建DAG调度器 private val dagScheduler = new DAGScheduler(this) // 5. 启动任务调度器 taskScheduler.start() // 6. 设置默认并行度 private val defaultParallelism: Int = taskScheduler.defaultParallelism // 核心方法:创建RDD def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } // 核心方法:提交作业 def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { dagScheduler.runJob(rdd, func, partitions, callSite, resultHandler, localProperties.get) } } 二、RDD设计与实现 2.1 RDD核心抽象 RDD五大特性实现流程 graph LR A[RDD实例化] --> B[getPartitions获取分区列表] B --> C[compute定义计算函数] C --> D[getDependencies设置依赖关系] D --> E[partitioner设置分区器] E --> F[getPreferredLocations位置偏好] F --> G[RDD创建完成] style A fill:#e1f5fe style G fill:#e8f5e8 2.2 RDD五大特性 RDD源码核心实现 // RDD.scala 核心抽象 abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { // 五大特性的具体实现 // 1. 分区列表 protected def getPartitions: Array[Partition] // 2. 计算函数 def compute(split: Partition, context: TaskContext): Iterator[T] // 3. 依赖关系 protected def getDependencies: Seq[Dependency[_]] = deps // 4. 分区器(可选) @transient val partitioner: Option[Partitioner] = None // 5. 位置偏好(可选) protected def getPreferredLocations(split: Partition): Seq[String] = Nil // Transformation操作实现 def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) } def filter(f: T => Boolean): RDD[T] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[T, T]( this, (context, pid, iter) => iter.filter(cleanF), preservesPartitioning = true) } def reduceByKey(func: (T, T) => T): RDD[T] = self.withScope { reduceByKey(defaultPartitioner(self), func) } // Action操作实现 def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) } } 2.3 RDD操作执行 RDD操作执行流程图 graph TD A[RDD操作调用] --> B{操作类型} B -->|Transformation| C[创建新RDD] B -->|Action| D[触发作业执行] C --> C1[构建RDD血统] C1 --> C2[设置依赖关系] C2 --> C3[返回新RDD对象] C3 --> E[等待Action触发] D --> D1[调用SparkContext.runJob] D1 --> D2[DAGScheduler.runJob] D2 --> D3[构建DAG图] D3 --> D4[划分Stage] D4 --> D5[提交Task] D5 --> D6[Executor执行] D6 --> D7[返回结果] style C fill:#e8f5e8 style D fill:#ffebee style D3 fill:#fff3e0 style D6 fill:#e1f5fe 三、任务调度系统 3.1 DAGScheduler调度器 DAGScheduler作业提交流程图 graph TD A[用户调用Action] --> B[SparkContext.runJob] B --> C[DAGScheduler.runJob] C --> D[创建ActiveJob] D --> E[submitJob] E --> F[构建DAG图] F --> G[findMissingPartitions] G --> H[getMissingParentStages] H --> I{是否有父Stage} I -->|有| J[递归提交父Stage] I -->|无| K[submitMissingTasks] J --> L[等待父Stage完成] L --> K K --> M[创建TaskSet] M --> N[TaskScheduler.submitTasks] N --> O[分发Task到Executor] O --> P[Task执行完成] P --> Q[Stage完成] Q --> R[检查后续Stage] R --> S[Job完成] style A fill:#e1f5fe style F fill:#fff3e0 style K fill:#e8f5e8 style S fill:#c8e6c9 3.2 Stage划分算法 Stage划分算法流程图 graph TD A[开始Stage划分] --> B[从最终RDD开始] B --> C[遍历RDD依赖] C --> D{依赖类型} D -->|窄依赖| E[加入当前Stage] D -->|宽依赖| F[创建新Stage边界] E --> G[继续遍历父RDD] F --> H[创建ShuffleMapStage] G --> C H --> I[递归处理父RDD] I --> C C --> J{是否还有未处理RDD} J -->|是| C J -->|否| K[Stage划分完成] style A fill:#e1f5fe style F fill:#ffebee style H fill:#fff3e0 style K fill:#e8f5e8 DAGScheduler源码分析 // DAGScheduler.scala 核心调度逻辑 class DAGScheduler( private[scheduler] val sc: SparkContext, private[scheduler] val taskScheduler: TaskScheduler, listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, env: SparkEnv, clock: Clock = new SystemClock()) extends Logging { // 事件处理循环 private val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) // 提交作业的核心方法 def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter, Duration.Inf) waiter.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) throw exception } } // Stage划分核心算法 private def getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { shuffleIdToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => // 递归创建父Stage getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => if (!shuffleIdToMapStage.contains(dep.shuffleId)) { createShuffleMapStage(dep, firstJobId) } } createShuffleMapStage(shuffleDep, firstJobId) } } // 查找缺失的父依赖 private def getMissingAncestorShuffleDependencies( rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = { val ancestors = new ArrayStack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] val waitingForVisit = new ArrayStack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() if (!visited(toVisit)) { visited += toVisit toVisit.dependencies.foreach { case shuffleDep: ShuffleDependency[_, _, _] => if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) { ancestors.push(shuffleDep) waitingForVisit.push(shuffleDep.rdd) } case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) } } } ancestors } // 提交Stage private def submitStage(stage: Stage): Unit = { val jobId = activeJobForStage(stage) if (jobId.isDefined) { if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) if (missing.isEmpty) { submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } } } 3.3 内存监控与优化 内存存储状态监控 // 内存使用监控组件 class MemoryMonitor { // 监控Map的内存使用 def monitorMapMemory(map: SizeTrackingAppendOnlyMap[_, _]): MemoryUsage = { val estimatedSize = map.estimateSize() val currentMemory = map.currentMemory val maxMemory = map.maxMemory MemoryUsage( estimatedSize = estimatedSize, currentMemory = currentMemory, maxMemory = maxMemory, utilization = currentMemory.toDouble / maxMemory ) } // 监控Spill状态 def monitorSpillStatus(externalMap: ExternalAppendOnlyMap[_, _, _]): SpillStatus = { val spillCount = externalMap.spills.size val totalSpillSize = externalMap.spills.map(_.size).sum SpillStatus( spillCount = spillCount, totalSpillSize = totalSpillSize, averageSpillSize = if (spillCount > 0) totalSpillSize / spillCount else 0 ) } } case class MemoryUsage( estimatedSize: Long, currentMemory: Long, maxMemory: Long, utilization: Double) case class SpillStatus( spillCount: Int, totalSpillSize: Long, averageSpillSize: Long) 内存存储监控流程图 graph TD A[输入数据] --> B[PartitionedAppendOnlyMap] B --> C{内存是否足够?} C -->|是| D[内存聚合] C -->|否| E[Spill到磁盘] D --> F[返回结果] E --> G[ExternalAppendOnlyMap] G --> H[合并内存和磁盘数据] H --> F I[MemoryMonitor] --> B I --> G J[SpillMonitor] --> E 内存存储优化策略 // 内存分配优化 class MemoryOptimizer { // 动态调整内存阈值 def adjustMemoryThreshold( currentMemory: Long, maxMemory: Long, spillCount: Int): Long = { val utilization = currentMemory.toDouble / maxMemory if (utilization > 0.8 && spillCount > 0) { // 内存使用率高且有Spill,降低阈值 (maxMemory * 0.6).toLong } else if (utilization < 0.5 && spillCount == 0) { // 内存使用率低且无Spill,提高阈值 (maxMemory * 0.9).toLong } else { // 保持当前阈值 (maxMemory * 0.8).toLong } } // 优化Map初始容量 def optimizeInitialCapacity(dataSize: Long): Int = { val estimatedSize = (dataSize * 1.2).toInt math.max(64, math.min(estimatedSize, 1024 * 1024)) } } 3.4 TaskScheduler任务调度 DAG的生成与依赖分析 任务提交完整流程图:
...