弹性式数据集RDDs

This commit is contained in:
罗祥 2019-06-04 13:59:45 +08:00
parent c15c675299
commit 3729e8a4b4

View File

@ -23,15 +23,15 @@
## 一、RDD简介
RDD全称为 Resilient Distributed Datasets是Spark最基本的数据抽象它是只读的、分区记录的集合支持并行操作。RDD可以由外部数据集或其他RDD转换而来。其具备高度的容错性允许开发人员在大型集群上执行基于内存的并行计算。它具有以下特性:
`RDD`全称为Resilient Distributed Datasets是Spark最基本的数据抽象它是只读的、分区记录的集合支持并行操作可以由外部数据集或其他RDD转换而来它具有以下特性:
+ 一个RDD由一个或者多个分区Partitions组成。对于RDD来说每个分区会被一个计算任务所处理用户可以在创建RDD时指定其分区个数如果没有指定则采用程序所分配到的CPU的核心数
+ 一个RDD由一个或者多个分区Partitions组成。对于RDD来说每个分区会被一个计算任务所处理用户可以在创建RDD时指定其分区个数如果没有指定默认采用程序所分配到的CPU的核心数
+ RDD拥有一个用于计算分区的函数compute
+ RDD会保存彼此间的依赖关系RDD的每次转换都会生成一个新的依赖关系这种RDD之间的依赖关系就像流水线一样。在部分分区数据丢失后可以通过这种依赖关系重新计算丢失的分区数据而不是对RDD的所有分区进行重新计算
+ Key-Value型的RDD还拥有Partitioner(分区器)用于决定数据被存储在哪个分区中目前Spark中支持HashPartitioner(按照哈希分区)和RangeParationer(按照范围进行分区)
+ 一个优先位置列表(可选),用于存储每个分区的优先位置(prefered location)。对于一个HDFS文件来说这个列表保存的就是每个分区所在的块的位置按照“移动数据不如移动计算“的理念Spark在进行任务调度的时候会尽可能的将计算任务分配到其所要处理数据块的存储位置。
RDD[T]抽象类的部分相关代码如下:
`RDD[T]`抽象类的部分相关代码如下:
```scala
// 由子类实现以计算给定分区
@ -89,7 +89,7 @@ val dataRDD = sc.parallelize(data,2)
### 2.2 引用外部存储系统中的数据集
引用外部存储系统中的数据集,例如共享文件系统HDFSHBase或支持Hadoop InputFormat的任何数据源。
引用外部存储系统中的数据集,例如本地文件系统HDFSHBase或支持Hadoop InputFormat的任何数据源。
```scala
val fileRDD = sc.textFile("/usr/file/emp.txt")
@ -97,22 +97,20 @@ val fileRDD = sc.textFile("/usr/file/emp.txt")
fileRDD.take(1)
```
使用外部存储系统有以下三点需要注意
使用外部存储系统时需要注意以下两点
+ 支持本地文件系统也支持HDFSs3a等文件系统
+ 如果在集群环境下从本地文件系统读取数据,则要求该文件必须在集群中所有机器上都存在,且路径相同
+ 如果Spark是以集群的方式运行且需要从本地文件系统读取数据则该文件必须在所有节点机器上都存在且路径相同
+ 文件格式支持目录,压缩文件,文件路径支持通配符。
+ 支持目录路径,支持压缩文件,支持使用通配符。
### 2.3 textFile & wholeTextFiles
两者都可以用来读取外部文件,但是返回格式是不同的:
+ textFile其返回格式是RDD[String] 返回的是就是文件内容RDD中每一个元素对应一行数据
+ wholeTextFiles其返回格式是RDD[(String, String)],元组中第一个参数是文件路径,第二个参数是文件内容;
+ **textFile**:其返回格式是`RDD[String]` 返回的是就是文件内容RDD中每一个元素对应一行数据
+ **wholeTextFiles**:其返回格式是`RDD[(String, String)]`,元组中第一个参数是文件路径,第二个参数是文件内容;
+ 两者都提供第二个参数来控制最小分区数;
+ 默认情况下Spark为文件的每个块创建一个分区HDFS中默认为128MB
+ 从HDFS上读取文件时Spark会为每个块创建一个分区
```scala
def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {...}
@ -123,7 +121,7 @@ def wholeTextFiles(path: String,minPartitions: Int = defaultMinPartitions): RDD[
## 三、操作RDD
RDD支持两种类型的操作*transformations*(转换,从现有数据集创建新数据集)和*actions*在数据集上运行计算后将值返回到驱动程序。RDD中的所有转换操作都是惰性的它们只是记住这些转换操作但不会立即执行只有遇到action操作后才会真正的进行计算这类似于函数式编程中的惰性求值。
RDD支持两种类型的操作*transformations*(转换,从现有数据集创建新数据集)和 *actions*在数据集上运行计算后将值返回到驱动程序。RDD中的所有转换操作都是惰性的它们只是记住这些转换操作但不会立即执行只有遇到 *action*操作后才会真正的进行计算,这类似于函数式编程中的惰性求值。
```scala
val list = List(1, 2, 3)
@ -138,28 +136,28 @@ sc.parallelize(list).map(_ * 10).foreach(println)
### 4.1 缓存级别
Spark速度非常快的一个原因是RDD支持缓存。当缓存一个RDD到内存中如果之后的操作使用到了该数据集则从缓存获取。虽然缓存也有丢失的风险但是由于RDD之间的依赖关系如果某个分区的缓存数据丢失只需要重新计算该分区即可。
Spark速度非常快的一个原因是RDD支持缓存。成功缓存后,如果之后的操作使用到了该数据集,则直接从缓存获取。虽然缓存也有丢失的风险但是由于RDD之间的依赖关系如果某个分区的缓存数据丢失只需要重新计算该分区即可。
Spark支持多种缓存级别见下表
| Storage Level存储级别 | Meaning含义 |
| ----------------------------------------------- | ------------------------------------------------------------ |
| MEMORY_ONLY | 默认的缓存级别,将 RDD以反序列化的Java对象的形式存储在 JVM 中。如果内存空间不够,则部分分区数据将不再缓存。 |
| MEMORY_AND_DISK | 将 RDD 以反序列化的Java对象的形式存储JVM中。如果内存空间不够将未缓存的分区数据存储到磁盘在需要使用这些分区时从磁盘读取。 |
| MEMORY_ONLY_SER<br/>(仅支持 Java and Scala) | 将 RDD 以序列化的Java对象的形式进行存储每个分区为一个 byte 数组。这种方式比反序列化对象节省存储空间但在读取时会增加CPU的计算负担。 |
| MEMORY_AND_DISK_SER<br/>(仅支持 Java and Scala) | 类似于MEMORY_ONLY_SER但是溢出的分区数据会存储到磁盘而不是在用到它们时重新计算。 |
| DISK_ONLY | 只在磁盘上缓存RDD |
| MEMORY_ONLY_2, <br/>MEMORY_AND_DISK_2, etc | 与上面的对应级别功能相同,但是会为每个分区在集群中两个节点上建立副本。 |
| OFF_HEAP | 与MEMORY_ONLY_SER类似但将数据存储在堆外内存中。这需要启用堆外内存。 |
| Storage Level<br/>(存储级别) | Meaning含义 |
| ------------------------------------------------- | ------------------------------------------------------------ |
| `MEMORY_ONLY` | 默认的缓存级别,将 RDD以反序列化的Java对象的形式存储在 JVM 中。如果内存空间不够,则部分分区数据将不再缓存。 |
| `MEMORY_AND_DISK` | 将 RDD 以反序列化的Java对象的形式存储JVM中。如果内存空间不够将未缓存的分区数据存储到磁盘在需要使用这些分区时从磁盘读取。 |
| `MEMORY_ONLY_SER`<br/>(仅支持 Java and Scala) | 将 RDD 以序列化的Java对象的形式进行存储每个分区为一个 byte 数组。这种方式比反序列化对象节省存储空间但在读取时会增加CPU的计算负担。 |
| `MEMORY_AND_DISK_SER`<br/>(仅支持 Java and Scala) | 类似于`MEMORY_ONLY_SER`,但是溢出的分区数据会存储到磁盘,而不是在用到它们时重新计算。 |
| `DISK_ONLY` | 只在磁盘上缓存RDD |
| `MEMORY_ONLY_2`, <br/>`MEMORY_AND_DISK_2`, etc | 与上面的对应级别功能相同,但是会为每个分区在集群中两个节点上建立副本。 |
| `OFF_HEAP` | 与`MEMORY_ONLY_SER`类似,但将数据存储在堆外内存中。这需要启用堆外内存。 |
> 启动堆外内存需要配置两个参数:
>
> + spark.memory.offHeap.enabled 是否开启堆外内存默认值为false,需要设置为true;
> + spark.memory.offHeap.size : 堆外内存空间的大小默认值为0需要设置为正值。
> + **spark.memory.offHeap.enabled** 是否开启堆外内存默认值为false需要设置为true
> + **spark.memory.offHeap.size** : 堆外内存空间的大小默认值为0需要设置为正值。
### 4.2 使用缓存
RDD上有两个可选的方法用于缓存数据`persist``cache` cache内部调用的也是persist,其等价于`persist(StorageLevel.MEMORY_ONLY)`
缓存数据的方法有两个:`persist``cache``cache`内部调用的也是`persist`,其等价于`persist(StorageLevel.MEMORY_ONLY)`
```scala
// 所有存储级别均定义在StorageLevel对象中
@ -177,7 +175,7 @@ Spark会自动监视每个节点上的缓存使用情况并按照最近最少
### 5.1 shuffle介绍
Spark中一个任务对应一个分区通常不会跨分区操作数据。但如果遇到reduceByKey等操作Spark必须从所有分区读取数据并查找所有键的所有值然后汇总在一起以计算每个键的最终结果 ,这称为shuffle
Spark中一个任务对应一个分区通常不会跨分区操作数据。但如果遇到`reduceByKey`等操作Spark必须从所有分区读取数据并查找所有键的所有值然后汇总在一起以计算每个键的最终结果 ,这称为`Shuffle`
<div align="center"> <img width="600px" src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-reducebykey.png"/> </div>
@ -185,15 +183,15 @@ Spark中一个任务对应一个分区通常不会跨分区操作数据。
### 5.2 Shuffle的影响
Shuffle是一项昂贵的操作因为它通常会跨节点操作数据必然会涉及磁盘I/O网络I/O和数据序列化。某些shuffle操作还会消耗大量的堆内存因为它们使用内存中的数据结构来组织数据并传输。Shuffle还会在磁盘上生成大量中间文件从Spark 1.3开始这些文件将被保留直到相应的RDD不再使用并进行垃圾回收这样做是为了避免在计算时重复创建shuffle文件。如果应用程序长期保留对这些RDD的引用则垃圾回收可能在很长一段时间后才会发生这意味着长时间运行的Spark作业可能会占用大量磁盘空间通常可以使用`spark.local.dir`参数指定这些文件临时存储目录。
Shuffle是一项昂贵的操作因为它通常会跨节点操作数据这会涉及磁盘I/O网络I/O和数据序列化。某些Shuffle操作还会消耗大量的堆内存因为它们使用内存来临时存储需要网络传输的数据。Shuffle还会在磁盘上生成大量中间文件从Spark 1.3开始这些文件将被保留直到相应的RDD不再使用并进行垃圾回收这样做是为了避免在计算时重复创建Shuffle文件。如果应用程序长期保留对这些RDD的引用则垃圾回收可能在很长一段时间后才会发生这意味着长时间运行的Spark作业可能会占用大量磁盘空间通常可以使用`spark.local.dir`参数指定这些临时文件的存储目录。
### 5.3 导致Shuffle的操作
由于Shuffle操作对性能的影响比较大所以需要特别注意使用以下操作都会导致Shuffle
+ 涉及到重新分区操作: 如`repartition``coalesce`;
+ 所有涉及到ByKey的操作counting除外`groupByKey``reduceByKey`;
+ 联结操作:如`cogroup``join`
+ **涉及到重新分区操作**`repartition``coalesce`
+ **所有涉及到ByKey的操作**:如`groupByKey``reduceByKey`,但`countByKey`除外;
+ **联结操作**:如`cogroup``join`
@ -201,10 +199,10 @@ Shuffle是一项昂贵的操作因为它通常会跨节点操作数据
RDD和它的父RDD(s)之间的依赖关系分为两种不同的类型:
- 窄依赖(narrow dependency)父RDDs的一个分区最多被子RDDs一个分区所依赖
- 宽依赖(wide dependency)父RDDs的一个分区可以被子RDDs的多个子分区所依赖。
- **窄依赖(narrow dependency)**父RDDs的一个分区最多被子RDDs一个分区所依赖
- **宽依赖(wide dependency)**父RDDs的一个分区可以被子RDDs的多个子分区所依赖。
如下图:每一个方框表示一个 RDD带有颜色的矩形表示分区
如下图每一个方框表示一个RDD带有颜色的矩形表示分区
<div align="center"> <img width="600px" src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-窄依赖和宽依赖.png"/> </div>
@ -212,16 +210,14 @@ RDD和它的父RDD(s)之间的依赖关系分为两种不同的类型:
区分这两种依赖是非常有用的:
+ 首先窄依赖允许在一个集群节点上以流水线的方式pipeline对父分区数据进行计算。例如先执行map操作、然后执行filter操作而宽依赖则需要首先计算好所有父分区的数据然后在节点之间进行Shuffle这与MapReduce类似。
+ 窄依赖能够更有效地进行数据恢复因为只需重新对丢失分区的父分区进行计算且不同节点之间可以并行计算而对于宽依赖而言如果数据丢失则需要对所有父分区数据进行计算并Shuffle。
+ 首先窄依赖允许在一个集群节点上以流水线的方式pipeline对父分区数据进行计算例如先执行map操作然后执行filter操作。而宽依赖则需要计算好所有父分区的数据然后再在节点之间进行Shuffle这与MapReduce类似。
+ 窄依赖能够更有效地进行数据恢复,因为只需重新对丢失分区的父分区进行计算,且不同节点之间可以并行计算;而对于宽依赖而言,如果数据丢失,则需要对所有父分区数据进行计算并再次Shuffle。
## 六、DAG的生成
RDD(s)及其之间的依赖关系组成了DAG(有向无环图)DAG定义了这些RDD(s)之间的Lineage(血统)关系通过血统关系如果一个RDD的部分或者全部计算结果丢失了也可以重新进行计算。
那么Spark是如何根据DAG来生成计算任务呢主要是根据依赖关系的不同将DAG划分为不同的计算阶段(Stage)
RDD(s)及其之间的依赖关系组成了DAG(有向无环图)DAG定义了这些RDD(s)之间的Lineage(血统)关系通过血统关系如果一个RDD的部分或者全部计算结果丢失了也可以重新进行计算。那么Spark是如何根据DAG来生成计算任务呢主要是根据依赖关系的不同将DAG划分为不同的计算阶段(Stage)
+ 对于窄依赖,由于分区的依赖关系是确定的,其转换操作可以在同一个线程执行,所以可以划分到同一个执行阶段;
+ 对于宽依赖由于Shuffle的存在只能在父RDD(s)被Shuffle处理完成后才能开始接下来的计算因此遇到宽依赖就需要重新划分阶段。