一丶目标
二丶 弹性分布式数据集
2. 弹性分布式数据集RDD
- RDD(Resilient Distributed Dataset)
- Resilient: 弹性的 –> 表示最终数据可以保存在磁盘,也可以保存在内存中
- Distributed: 数据分布式存储,并且可以做分布式计算
- Dataset: 数据集,可以理解为集合,存放数据
概述:
- 分布式数据集,是Spark中最基本的数据抽象,代表一个不可变,可分区,里面的元素可并行计算的集合
自动容错,位置感知性调度,可伸缩性
3.RDD的五大特性
- A list of partitions
每个RDD都有一个分区列表 - A function for computing each split
作用在每个分区上的函数 - A list of dependencies on other RDDs
一个RDD依赖其他多个RDD,重要 - Optionally, a Partitioner for key-value RDDs
可选项: 针对于kv键值对的RDD才具有该分区的特性 - Optionally, a list of preferred locations to compute each split on
可选项: 数据本地性,数据最优,选择尽量存储在worker节点上的数据节点
4. 创建RDD
由一个已经存在的scala集合创建
1
val rdd1 = sc.parallelize(Array(1,2,3,4,5))
由外部存储系统的数据集创建
1
val rdd2 = sc.textFile("/wc.txt")
由一个RDD转换成一个新的RDD
1
val rdd3 = rdd2.flatMap
5. RDD算子的分类
- Transformation : 转换操作,将一个RDD转换成一个新的RDD(flatMap/map/reduceByKey),它属于懒加载,延迟执行,并不会立即出发任务的执行.
- Action : 此时才会真正出发任务的计算
6. RDD的两种依赖关系
- 窄依赖
- 每一个父RDD的Partition最多只被子RDD的一个Partition使用
- 宽依赖
- 子RDD的Partition会依赖于父RDD的多个Partition
7. Lineage(血统)
RDD的Lineage会记录当前作用在RDD上的分区数据(元数据) 和一系列的转换行为,当子RDD中的某个分区数据丢失之后,只需要通过Lineage来重新计算恢复当前丢失数据的分区(Spark具备容错机制)
8. RDD的缓存
当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并对此或衍生出的RDD进行其他动作中重用
RDD的持久化和缓存,是Spark重要的特征之一,可以说,Spark是构建迭代算法和交互式查询的关键
8.1 RDD的缓存方式
通过执行cache和persist方法,可以将前面的计算结果缓存,但并不是在方法调用时缓存,而是在后面出发action时,该RDD被缓存在计算节点的内存中
- cache : 直接将RDD中的数据,保存在内存中,其本质是persist(StorageLevel.MEMORY_ONLY)
- persist : 可以有丰富的缓存界别
- 当RDD设置了缓存之后,如果有需要用到该RDD数据的时候,就不需要重复计算,可以直接重缓存中得到.
9. DAG的生成
- DAG: 有向无环图, 原始的RDD进过一系列转换后形成DAG,根据RDD之间的依赖关系不同将DAG划分成不同的Stage
- 宽依赖是划分DAG的依据
- 根据DAG有向无环图,来划分stage
- 依据:
- 从最后一个算子往前推,依次判断当前算子是否是宽依赖,如果不是,继续往前推,如果遇到宽依赖的算子,就从对应这个宽依赖划分,把从最后一个算子,一直到第一个宽依赖算子,划分为一个stage.
- 接下来继续按照这种方式往前推,直到最开始的算子操作
- 划分stage目的:
- 可以让每一个阶段并行的进行任务计算
- 后一个stage依赖与前一个stage
- 依据:
10. Spark任务调度
各个RDD之间存在着依赖关系,这些依赖关系形成有向无环图DAG,DAGScheduler对这些依赖关系形成的DAG进行Stage划分,划分的规则很简单,从后往前回溯,遇到窄依赖加入本stage,遇见宽依赖进行Stage切分。完成了Stage的划分,DAGScheduler基于每个Stage生成TaskSet,并将TaskSet提交给TaskScheduler。TaskScheduler 负责具体的task调度,最后在Worker节点上启动task。
10.1 DAGScheduler
(1)DAGScheduler构建Stage
(2)记录哪个RDD或者 Stage 输出被物化(缓存),通常在一个复杂的shuffle之后,通常物化一下(cache、persist),方便之后的计算。
(3)重新提交shuffle输出丢失的stage(stage内部计算出错)
(4)将 Taskset 传给底层调度器
a)– spark-cluster TaskScheduler
b)– yarn-cluster YarnClusterScheduler
c)– yarn-client YarnClientClusterScheduler
10.2 TaskScheduler
(1)为每一个TaskSet构建一个TaskSetManager 实例管理这个TaskSet 的生命周期
(2)数据本地性决定每个Task最佳位置
(3)提交 taskset( 一组task) 到集群运行并监控
(4)推测执行,碰到 straggle(计算缓慢)任务需要放到别的节点上重试
(5)重新提交Shuffle输出丢失的Stage给DAGScheduler
三丶RDD容错机制之checkpoint
11. checkpoint
- 会对数据进行持久化操作,保存在HDFS
- 使用的时候:
- 需要sc.setCheckpointDir 来设置一个检查点目录
- 对需要缓存的RDD调用checkpoint
- 注意: 在执行数据缓存时,需要有对应action算子操作,才会真正出发持久化操作
- 在做checkPoint操作时,此时会先执行对应出发action算子的RDD的结果,计算完成之后,又会开辟一个新的job来计算你设置了checkPoint的RDD的结果
- 在设置了checkpoint之后,对应的这个RDD会改变之前的依赖关系,如果当前数据丢失,只有重新计算得到.
如何使用checkpoint?
- 调用sparkContext的setCheckPoint方法,设置容错文件系统目录,比如hdfs
- 对RDD调用checkPoint方法
- 在RDD所处的job运行结束后,启动一个单独的job,将checkpoint过的数据写入文件系统持久化,进行高可用
persist或者cache与checkpoint的区别?
- 前者持久化只是将数据保存在BlockManager中但是其lineage是不变的*
- 后者checkpoint执行完后,rdd已经没有依赖RDD,只有一个checkpointRDD,checkpoint之后,RDD的lineage就改变了。
- 而且,持久化的数据丢失的可能性更大,因为可能磁盘或内存被清理,但是checkpoint的数据通常保存到hdfs上,放在了高容错文件系统。
四丶Spark运行架构
12. spark运行基本流程
1) 构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
2) 资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;
3) SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task,TaskScheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor。
4) Task在Executor上运行,运行完毕释放所有资源。
13. Spark运行架构特点
Spark运行架构特点:
- 每个Application获取专属的executor进程,该进程在Application期间一直驻留,并以多线程方式运行tasks。这种Application隔离机制有其优势的,无论是从调度角度看(每个Driver调度它自己的任务),还是从运行角度看(来自不同Application的Task运行在不同的JVM中)。当然,这也意味着Spark Application不能跨应用程序共享数据,除非将数据写入到外部存储系统。
- Spark与资源管理器无关,只要能够获取executor进程,并能保持相互通信就可以了。
- 提交SparkContext的Client应该靠近Worker节点(运行Executor的节点),最好是在同一个Rack里,因为Spark Application运行过程中SparkContext和Executor之间有大量的信息交换;如果想在远程集群中运行,最好使用RPC将SparkContext提交给集群,不要远离Worker运行SparkContext。
- Task采用了数据本地性和推测执行的优化机制。