BigData-Notes/notes/Spark-Transformation和Action.md
2019-05-15 17:57:21 +08:00

421 lines
18 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Transformation 和 Action 常用算子
<nav>
<a href="#一Transformation">一、Transformation</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#11-map">1.1 map</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#12-filter">1.2 filter</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#13-flatMap">1.3 flatMap</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#14-mapPartitions">1.4 mapPartitions</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#15-mapPartitionsWithIndex">1.5 mapPartitionsWithIndex</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#16-sample">1.6 sample</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#17-union">1.7 union</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#18-intersection">1.8 intersection</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#19-distinct">1.9 distinct</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#110-groupByKey">1.10 groupByKey</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#111-reduceByKey">1.11 reduceByKey</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#112-sortBy--sortByKey">1.12 sortBy & sortByKey </a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#113-join">1.13 join</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#114-cogroup">1.14 cogroup</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#115-cartesian">1.15 cartesian</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#116-aggregateByKey">1.16 aggregateByKey</a><br/>
<a href="#二Action">二、Action</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#21-reduce">2.1 reduce</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#22-takeOrdered">2.2 takeOrdered</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#23-countByKey">2.3 countByKey</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#24-saveAsTextFile">2.4 saveAsTextFile</a><br/>
</nav>
## 一、Transformation
下表为spark官网给出的常用的Transformation算子
| Transformation算子 | Meaning含义 |
| ------------------------------------------------------------ | ------------------------------------------------------------ |
| **map**(*func*) | 对原RDD中每个元素运用 *func* 函数并生成新的RDD |
| **filter**(*func*) | 对原RDD中每个元素使用*func* 函数进行过滤并生成新的RDD |
| **flatMap**(*func*) | 与 map 类似,但是每一个输入的 item 被映射成 0 个或多个输出的 items *func* 返回类型需要为 Seq )。 |
| **mapPartitions**(*func*) | 与 map 类似但函数单独在RDD的每个分区上运行 *func*函数的类型为 Iterator\<T> => Iterator\<U> 其中T是RDD的类型即RDD[T] |
| **mapPartitionsWithIndex**(*func*) | 与 mapPartitions 类似,但 *func* 类型为 (Int, Iterator\<T>) => Iterator\<U> ,其中第一个参数为分区索引 |
| **sample**(*withReplacement*, *fraction*, *seed*) | 数据采样有三个可选参数设置是否放回withReplacement、采样的百分比*fraction*、随机数生成器的种子seed |
| **union**(*otherDataset*) | 合并两个RDD |
| **intersection**(*otherDataset*) | 求两个RDD的交集 |
| **distinct**([*numTasks*])) | 去重 |
| **groupByKey**([*numTasks*]) | 按照key值进行分区即在一个 (K, V) 对的 dataset 上调用时,返回一个 (K, Iterable\<V>) <br/>**Note:** 如果分组是为了在每一个 key 上执行聚合操作例如sum 或 average),此时使用 `reduceByKey``aggregateByKey` 性能会更好<br>**Note:** 默认情况下,并行度取决于父 RDD 的分区数。可以传入 `numTasks` 参数进行修改。 |
| **reduceByKey**(*func*, [*numTasks*]) | 按照key值进行分组并对分组后的数据执行归约操作。 |
| **aggregateByKey**(*zeroValue*,*numPartitions*)(*seqOp*, *combOp*, [*numTasks*]) | 当调用KV对的数据集时返回KU对的数据集其中使用给定的组合函数和zeroValue聚合每个键的值。与groupByKey类似reduce任务的数量可通过第二个参数进行配置。 |
| **sortByKey**([*ascending*], [*numTasks*]) | 按照key进行排序其中的key需要实现Ordered特质即可比较 |
| **join**(*otherDataset*, [*numTasks*]) | 在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset等价于内连接操作。如果想要执行外连接可以使用`leftOuterJoin`, `rightOuterJoin``fullOuterJoin` 等算子。 |
| **cogroup**(*otherDataset*, [*numTasks*]) | 在一个 (K, V) 对的 dataset 上调用时,返回一个 (K, (Iterable\<V>, Iterable\<W>)) tuples 的 dataset。 |
| **cartesian**(*otherDataset*) | 在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) 类型的 dataset即笛卡尔积。 |
| **coalesce**(*numPartitions*) | 将RDD中的分区数减少为numPartitions。 |
| **repartition**(*numPartitions*) | 随机重新调整RDD中的数据以创建更多或更少的分区并在它们之间进行平衡。 |
| **repartitionAndSortWithinPartitions**(*partitioner*) | 根据给定的 partitioner分区器对 RDD 进行重新分区,并对分区中的数据按照 key 值进行排序。这比调用 `repartition` 然后再 sorting排序效率更高因为它可以将排序过程推送到 shuffle 操作所在的机器。 |
下面分别给出这些算子的基本使用实例:
### 1.1 map
```scala
val list = List(1,2,3)
sc.parallelize(list).map(_ * 10).foreach(println)
// 输出结果: 10 20 30 (这里为了节省篇幅去掉了换行,后文亦同)
```
### 1.2 filter
```scala
val list = List(3, 6, 9, 10, 12, 21)
sc.parallelize(list).filter(_ >= 10).foreach(println)
// 输出: 10 12 21
```
### 1.3 flatMap
与 map 类似,但是每一个输入的 item 被映射成 0 个或多个输出的 items *func* 返回类型需要为 Seq )。
```scala
val list = List(List(1, 2), List(3), List(), List(4, 5))
sc.parallelize(list).flatMap(_.toList).map(_ * 10).foreach(println)
// 输出结果 10 20 30 40 50
```
flatMap 这个算子在日志分析中使用概率非常高,这里进行一下演示:
拆分输入的每行数据为单个单词并赋值为1代表出现一次之后按照单词分组并统计其出现总次数代码如下
```scala
val lines = List("spark flume spark",
"hadoop flume hive")
sc.parallelize(lines).flatMap(line => line.split(" ")).
map(word=>(word,1)).reduceByKey(_+_).foreach(println)
// 输出:
(spark,2)
(hive,1)
(hadoop,1)
(flume,2)
```
### 1.4 mapPartitions
与 map 类似但函数单独在RDD的每个分区上运行 *func*函数的类型为Iterator\<T> => Iterator\<U> 其中T是RDD的类型即输入和输出都必须是可迭代类型。
```scala
val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list, 3).mapPartitions(iterator => {
val buffer = new ListBuffer[Int]
while (iterator.hasNext) {
buffer.append(iterator.next() * 100)
}
buffer.toIterator
}).foreach(println)
//输出结果
100 200 300 400 500 600
```
### 1.5 mapPartitionsWithIndex
与 mapPartitions 类似,但 *func* 类型为 (Int, Iterator\<T>) => Iterator\<U> ,其中第一个参数为分区索引。
```scala
val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list, 3).mapPartitionsWithIndex((index, iterator) => {
val buffer = new ListBuffer[String]
while (iterator.hasNext) {
buffer.append(index + "分区:" + iterator.next() * 100)
}
buffer.toIterator
}).foreach(println)
//输出
0分区:100
0分区:200
1分区:300
1分区:400
2分区:500
2分区:600
```
### 1.6 sample
数据采样有三个可选参数设置是否放回withReplacement、采样的百分比*fraction*、随机数生成器的种子seed:
```scala
val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list).sample(withReplacement = false, 0.5).foreach(println)
```
### 1.7 union
合并两个RDD
```scala
val list1 = List(1, 2, 3)
val list2 = List(4, 5, 6)
sc.parallelize(list1).union(sc.parallelize(list2)).foreach(println)
// 输出: 1 2 3 4 5 6
```
### 1.8 intersection
求两个RDD的交集
```scala
val list1 = List(1, 2, 3, 4, 5)
val list2 = List(4, 5, 6)
sc.parallelize(list1).intersection(sc.parallelize(list2)).foreach(println)
// 输出: 4 5
```
### 1.9 distinct
去重:
```scala
val list = List(1, 2, 2, 4, 4)
sc.parallelize(list).distinct().foreach(println)
// 输出: 4 1 2
```
### 1.10 groupByKey
按照键进行分组:
```scala
val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
sc.parallelize(list).groupByKey().map(x => (x._1, x._2.toList)).foreach(println)
//输出:
(spark,List(3, 5))
(hadoop,List(2, 2))
(storm,List(6))
```
### 1.11 reduceByKey
按照键进行归约操作:
```scala
val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
sc.parallelize(list).reduceByKey(_ + _).foreach(println)
//输出
(spark,8)
(hadoop,4)
(storm,6)
```
### 1.12 sortBy & sortByKey
按照键进行排序:
```scala
val list01 = List((100, "hadoop"), (90, "spark"), (120, "storm"))
sc.parallelize(list01).sortByKey(ascending = false).foreach(println)
// 输出
(120,storm)
(90,spark)
(100,hadoop)
```
按照指定元素进行排序:
```scala
val list02 = List(("hadoop",100), ("spark",90), ("storm",120))
sc.parallelize(list02).sortBy(x=>x._2,ascending=false).foreach(println)
// 输出
(storm,120)
(hadoop,100)
(spark,90)
```
### 1.13 join
在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset等价于内连接操作。如果想要执行外连接可以使用`leftOuterJoin`, `rightOuterJoin``fullOuterJoin` 等算子。
```scala
val list01 = List((1, "student01"), (2, "student02"), (3, "student03"))
val list02 = List((1, "teacher01"), (2, "teacher02"), (3, "teacher03"))
sc.parallelize(list01).join(sc.parallelize(list02)).foreach(println)
// 输出
(1,(student01,teacher01))
(3,(student03,teacher03))
(2,(student02,teacher02))
```
### 1.14 cogroup
在一个 (K, V) 对的 dataset 上调用时,返回多个类型为 (K, (Iterable\<V>, Iterable\<W>))的元组所组成的dataset。
```scala
val list01 = List((1, "a"),(1, "a"), (2, "b"), (3, "e"))
val list02 = List((1, "A"), (2, "B"), (3, "E"))
val list03 = List((1, "[ab]"), (2, "[bB]"), (3, "eE"),(3, "eE"))
sc.parallelize(list01).cogroup(sc.parallelize(list02),sc.parallelize(list03)).foreach(println)
// 输出: 同一个RDD中的元素先按照key进行分组然后再对不同RDD中的元素按照key进行分组
(1,(CompactBuffer(a, a),CompactBuffer(A),CompactBuffer([ab])))
(3,(CompactBuffer(e),CompactBuffer(E),CompactBuffer(eE, eE)))
(2,(CompactBuffer(b),CompactBuffer(B),CompactBuffer([bB])))
```
### 1.15 cartesian
计算笛卡尔积:
```scala
val list1 = List("A", "B", "C")
val list2 = List(1, 2, 3)
sc.parallelize(list1).cartesian(sc.parallelize(list2)).foreach(println)
//输出笛卡尔积
(A,1)
(A,2)
(A,3)
(B,1)
(B,2)
(B,3)
(C,1)
(C,2)
(C,3)
```
### 1.16 aggregateByKey
当调用KV对的数据集时返回KU对的数据集其中使用给定的组合函数和zeroValue聚合每个键的值。与groupByKey类似reduce任务的数量可通过第二个参数`numPartitions`进行配置。示例如下:
```scala
// 为了清晰,以下所有参数均使用具名传参
val list = List(("hadoop", 3), ("hadoop", 2), ("spark", 4), ("spark", 3), ("storm", 6), ("storm", 8))
sc.parallelize(list,numSlices = 2).aggregateByKey(zeroValue = 0,numPartitions = 3)(
seqOp = math.max(_, _),
combOp = _ + _
).collect.foreach(println)
//输出结果:
(hadoop,3)
(storm,8)
(spark,7)
```
这里使用了`numSlices = 2`指定aggregateByKey父操作parallelize的分区数量为2其执行流程如下
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-aggregateByKey.png"/> </div>
基于同样的执行流程,如果`numSlices = 1`则意味着只有输入一个分区则其最后一步combOp相当于是无效的执行结果为
```properties
(hadoop,3)
(storm,8)
(spark,4)
```
同样的,如果每个单词对一个分区,即`numSlices = 6`,此时相当于求和操作,执行结果为:
```properties
(hadoop,5)
(storm,14)
(spark,7)
```
最后一个问题是`aggregateByKey(zeroValue = 0,numPartitions = 3)`的第二个参数`numPartitions `决定的是什么实际上这个参数决定的是输出RDD的分区数量想要验证这个问题可以对上面代码进行改写使用`getNumPartitions`方法获取分区数量:
```scala
sc.parallelize(list,numSlices = 6).aggregateByKey(zeroValue = 0,numPartitions = 3)(
seqOp = math.max(_, _),
combOp = _ + _
).getNumPartitions
```
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-getpartnum.png"/> </div>
## 二、Action
下表为spark官网给出的常用的Action算子
| Action动作 | Meaning含义 |
| -------------------------------------------------- | ------------------------------------------------------------ |
| **reduce**(*func*) | 使用函数*func*执行归约操作 |
| **collect**() | 以一个 array 数组的形式返回 dataset 的所有元素,适用于小结果集。 |
| **count**() | 返回 dataset 中元素的个数。 |
| **first**() | 返回 dataset 中的第一个元素,等价于 take(1)。 |
| **take**(*n*) | 将数据集中的前 *n* 个元素作为一个 array 数组返回。 |
| **takeSample**(*withReplacement*, *num*, [*seed*]) | 对一个 dataset 进行随机抽样 |
| **takeOrdered**(*n*, *[ordering]*) | 按自然顺序natural order或自定义比较器custom comparator排序后返回前 *n* 个元素。只适用于小结果集,因为所有数据都会被加载到驱动程序的内存中进行排序。 |
| **saveAsTextFile**(*path*) | 将 dataset 中的元素以文本文件的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。Spark 将对每个元素调用 toString 方法,将元素转换为文本文件中的一行记录。 |
| **saveAsSequenceFile**(*path*) | 将 dataset 中的元素以Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。该操作要求RDD中的元素需要实现 Hadoop 的 Writable 接口。对于Scala语言而言它可以将Spark中的基本数据类型自动隐式转换为对应Writable类型。(目前仅支持Java and Scala) |
| **saveAsObjectFile**(*path*) | 使用 Java 序列化后存储,可以使用 `SparkContext.objectFile()` 进行加载。(目前仅支持Java and Scala) |
| **countByKey**() | 计算每个键出现的次数。 |
| **foreach**(*func*) | 遍历RDD中每个元素并对其执行*fun*函数 |
### 2.1 reduce
使用函数*func*执行归约操作:
```scala
val list = List(1, 2, 3, 4, 5)
sc.parallelize(list).reduce((x, y) => x + y)
sc.parallelize(list).reduce(_ + _)
// 输出 15
```
### 2.2 takeOrdered
按自然顺序natural order或自定义比较器custom comparator排序后返回前 *n* 个元素。需要注意的是`takeOrdered`使用隐式参数进行隐式转换以下为其源码。所以在使用自定义排序时需要继承Ordering[T]实现自定义比较器,然后将其作为隐式参数引入。
```scala
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
.........
}
```
自定义规则排序:
```scala
// 继承Ordering[T],实现自定义比较器按照value值的长度进行排序
class CustomOrdering extends Ordering[(Int, String)] {
override def compare(x: (Int, String), y: (Int, String)): Int
= if (x._2.length > y._2.length) 1 else -1
}
val list = List((1, "hadoop"), (1, "storm"), (1, "azkaban"), (1, "hive"))
// 引入隐式默认值
implicit val implicitOrdering = new CustomOrdering
sc.parallelize(list).takeOrdered(5)
// 输出: Array((1,hive), (1,storm), (1,hadoop), (1,azkaban)
```
### 2.3 countByKey
计算每个键出现的次数:
```scala
val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
sc.parallelize(list).countByKey()
// 输出: Map(hadoop -> 2, storm -> 2, azkaban -> 1)
```
### 2.4 saveAsTextFile
将 dataset 中的元素以文本文件的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。Spark 将对每个元素调用 toString 方法,将元素转换为文本文件中的一行记录。
```scala
val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
sc.parallelize(list).saveAsTextFile("/usr/file/temp")
```
## 参考资料
[RDD Programming Guide](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-programming-guide)