From 3729e8a4b40b884bb307e8c1490c85b89e8af6b2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=BD=97=E7=A5=A5?= <1366971433@qq.com>
Date: Tue, 4 Jun 2019 13:59:45 +0800
Subject: [PATCH] =?UTF-8?q?=E5=BC=B9=E6=80=A7=E5=BC=8F=E6=95=B0=E6=8D=AE?=
=?UTF-8?q?=E9=9B=86RDDs?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
notes/Spark_RDD.md | 74 ++++++++++++++++++++++------------------------
1 file changed, 35 insertions(+), 39 deletions(-)
diff --git a/notes/Spark_RDD.md b/notes/Spark_RDD.md
index d11e44f..2ecc748 100644
--- a/notes/Spark_RDD.md
+++ b/notes/Spark_RDD.md
@@ -23,15 +23,15 @@
## 一、RDD简介
-RDD,全称为 Resilient Distributed Datasets,是Spark最基本的数据抽象,它是只读的、分区记录的集合,支持并行操作。RDD可以由外部数据集或其他RDD转换而来。其具备高度的容错性,允许开发人员在大型集群上执行基于内存的并行计算。它具有以下特性:
+`RDD`全称为Resilient Distributed Datasets,是Spark最基本的数据抽象,它是只读的、分区记录的集合,支持并行操作,可以由外部数据集或其他RDD转换而来,它具有以下特性:
-+ 一个RDD由一个或者多个分区(Partitions)组成。对于RDD来说,每个分区会被一个计算任务所处理,用户可以在创建RDD时指定其分区个数,如果没有指定,则采用程序所分配到的CPU的核心数;
++ 一个RDD由一个或者多个分区(Partitions)组成。对于RDD来说,每个分区会被一个计算任务所处理,用户可以在创建RDD时指定其分区个数,如果没有指定,则默认采用程序所分配到的CPU的核心数;
+ RDD拥有一个用于计算分区的函数compute;
+ RDD会保存彼此间的依赖关系,RDD的每次转换都会生成一个新的依赖关系,这种RDD之间的依赖关系就像流水线一样。在部分分区数据丢失后,可以通过这种依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算;
+ Key-Value型的RDD还拥有Partitioner(分区器),用于决定数据被存储在哪个分区中,目前Spark中支持HashPartitioner(按照哈希分区)和RangeParationer(按照范围进行分区);
+ 一个优先位置列表(可选),用于存储每个分区的优先位置(prefered location)。对于一个HDFS文件来说,这个列表保存的就是每个分区所在的块的位置,按照“移动数据不如移动计算“的理念,Spark在进行任务调度的时候,会尽可能的将计算任务分配到其所要处理数据块的存储位置。
-RDD[T]抽象类的部分相关代码如下:
+`RDD[T]`抽象类的部分相关代码如下:
```scala
// 由子类实现以计算给定分区
@@ -89,7 +89,7 @@ val dataRDD = sc.parallelize(data,2)
### 2.2 引用外部存储系统中的数据集
-引用外部存储系统中的数据集,例如共享文件系统,HDFS,HBase或支持Hadoop InputFormat的任何数据源。
+引用外部存储系统中的数据集,例如本地文件系统,HDFS,HBase或支持Hadoop InputFormat的任何数据源。
```scala
val fileRDD = sc.textFile("/usr/file/emp.txt")
@@ -97,22 +97,20 @@ val fileRDD = sc.textFile("/usr/file/emp.txt")
fileRDD.take(1)
```
-使用外部存储系统有以下三点需要注意:
+使用外部存储系统时需要注意以下两点:
-+ 支持本地文件系统,也支持HDFS,s3a等文件系统;
++ 如果在集群环境下从本地文件系统读取数据,则要求该文件必须在集群中所有机器上都存在,且路径相同;
-+ 如果Spark是以集群的方式运行,且需要从本地文件系统读取数据,则该文件必须在所有节点机器上都存在,且路径相同;
-
-+ 文件格式支持目录,压缩文件,文件路径支持通配符。
++ 支持目录路径,支持压缩文件,支持使用通配符。
### 2.3 textFile & wholeTextFiles
两者都可以用来读取外部文件,但是返回格式是不同的:
-+ textFile:其返回格式是RDD[String] ,返回的是就是文件内容,RDD中每一个元素对应一行数据;
-+ wholeTextFiles:其返回格式是RDD[(String, String)],元组中第一个参数是文件路径,第二个参数是文件内容;
++ **textFile**:其返回格式是`RDD[String]` ,返回的是就是文件内容,RDD中每一个元素对应一行数据;
++ **wholeTextFiles**:其返回格式是`RDD[(String, String)]`,元组中第一个参数是文件路径,第二个参数是文件内容;
+ 两者都提供第二个参数来控制最小分区数;
-+ 默认情况下,Spark为文件的每个块创建一个分区(HDFS中默认为128MB)。
++ 从HDFS上读取文件时,Spark会为每个块创建一个分区。
```scala
def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {...}
@@ -123,7 +121,7 @@ def wholeTextFiles(path: String,minPartitions: Int = defaultMinPartitions): RDD[
## 三、操作RDD
-RDD支持两种类型的操作:*transformations*(转换,从现有数据集创建新数据集)和*actions*(在数据集上运行计算后将值返回到驱动程序)。RDD中的所有转换操作都是惰性的,它们只是记住这些转换操作,但不会立即执行,只有遇到action操作后才会真正的进行计算,这类似于函数式编程中的惰性求值。
+RDD支持两种类型的操作:*transformations*(转换,从现有数据集创建新数据集)和 *actions*(在数据集上运行计算后将值返回到驱动程序)。RDD中的所有转换操作都是惰性的,它们只是记住这些转换操作,但不会立即执行,只有遇到 *action*操作后才会真正的进行计算,这类似于函数式编程中的惰性求值。
```scala
val list = List(1, 2, 3)
@@ -138,28 +136,28 @@ sc.parallelize(list).map(_ * 10).foreach(println)
### 4.1 缓存级别
-Spark速度非常快的一个原因是RDD支持缓存。当缓存一个RDD到内存中后,如果之后的操作使用到了该数据集,则从缓存获取。虽然缓存也有丢失的风险,但是由于RDD之间的依赖关系,如果某个分区的缓存数据丢失,只需要重新计算该分区即可。
+Spark速度非常快的一个原因是RDD支持缓存。成功缓存后,如果之后的操作使用到了该数据集,则直接从缓存中获取。虽然缓存也有丢失的风险,但是由于RDD之间的依赖关系,如果某个分区的缓存数据丢失,只需要重新计算该分区即可。
Spark支持多种缓存级别,见下表:
-| Storage Level(存储级别) | Meaning(含义) |
-| ----------------------------------------------- | ------------------------------------------------------------ |
-| MEMORY_ONLY | 默认的缓存级别,将 RDD以反序列化的Java对象的形式存储在 JVM 中。如果内存空间不够,则部分分区数据将不再缓存。 |
-| MEMORY_AND_DISK | 将 RDD 以反序列化的Java对象的形式存储JVM中。如果内存空间不够,将未缓存的分区数据存储到磁盘,在需要使用这些分区时从磁盘读取。 |
-| MEMORY_ONLY_SER
(仅支持 Java and Scala) | 将 RDD 以序列化的Java对象的形式进行存储(每个分区为一个 byte 数组)。这种方式比反序列化对象节省存储空间,但在读取时会增加CPU的计算负担。 |
-| MEMORY_AND_DISK_SER
(仅支持 Java and Scala) | 类似于MEMORY_ONLY_SER,但是溢出的分区数据会存储到磁盘,而不是在用到它们时重新计算。 |
-| DISK_ONLY | 只在磁盘上缓存RDD |
-| MEMORY_ONLY_2,
MEMORY_AND_DISK_2, etc | 与上面的对应级别功能相同,但是会为每个分区在集群中两个节点上建立副本。 |
-| OFF_HEAP | 与MEMORY_ONLY_SER类似,但将数据存储在堆外内存中。这需要启用堆外内存。 |
+| Storage Level
(存储级别) | Meaning(含义) |
+| ------------------------------------------------- | ------------------------------------------------------------ |
+| `MEMORY_ONLY` | 默认的缓存级别,将 RDD以反序列化的Java对象的形式存储在 JVM 中。如果内存空间不够,则部分分区数据将不再缓存。 |
+| `MEMORY_AND_DISK` | 将 RDD 以反序列化的Java对象的形式存储JVM中。如果内存空间不够,将未缓存的分区数据存储到磁盘,在需要使用这些分区时从磁盘读取。 |
+| `MEMORY_ONLY_SER`
(仅支持 Java and Scala) | 将 RDD 以序列化的Java对象的形式进行存储(每个分区为一个 byte 数组)。这种方式比反序列化对象节省存储空间,但在读取时会增加CPU的计算负担。 |
+| `MEMORY_AND_DISK_SER`
(仅支持 Java and Scala) | 类似于`MEMORY_ONLY_SER`,但是溢出的分区数据会存储到磁盘,而不是在用到它们时重新计算。 |
+| `DISK_ONLY` | 只在磁盘上缓存RDD |
+| `MEMORY_ONLY_2`,
`MEMORY_AND_DISK_2`, etc | 与上面的对应级别功能相同,但是会为每个分区在集群中的两个节点上建立副本。 |
+| `OFF_HEAP` | 与`MEMORY_ONLY_SER`类似,但将数据存储在堆外内存中。这需要启用堆外内存。 |
> 启动堆外内存需要配置两个参数:
>
-> + spark.memory.offHeap.enabled :是否开启堆外内存,默认值为false,需要设置为true;
-> + spark.memory.offHeap.size : 堆外内存空间的大小,默认值为0,需要设置为正值。
+> + **spark.memory.offHeap.enabled** :是否开启堆外内存,默认值为false,需要设置为true;
+> + **spark.memory.offHeap.size** : 堆外内存空间的大小,默认值为0,需要设置为正值。
### 4.2 使用缓存
-RDD上有两个可选的方法用于缓存数据:`persist`和`cache` ,cache内部调用的也是persist,其等价于`persist(StorageLevel.MEMORY_ONLY)`。
+缓存数据的方法有两个:`persist`和`cache` 。`cache`内部调用的也是`persist`,其等价于`persist(StorageLevel.MEMORY_ONLY)`。
```scala
// 所有存储级别均定义在StorageLevel对象中
@@ -177,7 +175,7 @@ Spark会自动监视每个节点上的缓存使用情况,并按照最近最少
### 5.1 shuffle介绍
-Spark中,一个任务对应一个分区,通常不会跨分区操作数据。但如果遇到reduceByKey等操作,Spark必须从所有分区读取数据,并查找所有键的所有值,然后汇总在一起以计算每个键的最终结果 ,这称为shuffle。
+在Spark中,一个任务对应一个分区,通常不会跨分区操作数据。但如果遇到`reduceByKey`等操作,Spark必须从所有分区读取数据,并查找所有键的所有值,然后汇总在一起以计算每个键的最终结果 ,这称为`Shuffle`。