diff --git a/code/spark/spark-core/README.md b/code/spark/spark-core/README.md
deleted file mode 100644
index e1cd8da..0000000
--- a/code/spark/spark-core/README.md
+++ /dev/null
@@ -1,27 +0,0 @@
-val list = List(3,6,9,10,12,21)
-val listRDD = sc.parallelize(list)
-val intsRDD = listRDD.map(_*10)
-intsRDD.foreach(println)
-
-sc.parallelize(list).map(_*10).foreach(println)
-
-
-sc.parallelize(list).filter(_>=10).foreach(println)
-
-val list = List(List(1, 2), List(3), List(), List(4, 5))
-sc.parallelize(list).flatMap(_.toList).map(_*10).foreach(println)
-
-
-val list = List(1,2,3,4,5)
-sc.parallelize(list).reduce((x,y) => x+y)
-sc.parallelize(list).reduce(_+_)
-
-
-val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6),("hadoop", 2))
-sc.parallelize(list).reduceByKey(_+_).foreach(println)
-
-
-
- val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6),("hadoop", 2))
-sc.parallelize(list).groupByKey().map(x=>(x._1,x._2.toList)).foreach(println)
-
diff --git a/code/spark/spark-core/pom.xml b/code/spark/spark-core/pom.xml
index 139270c..c685072 100644
--- a/code/spark/spark-core/pom.xml
+++ b/code/spark/spark-core/pom.xml
@@ -9,7 +9,7 @@
1.0
- 2.12.8
+ 2.12
@@ -27,23 +27,20 @@
+
org.apache.spark
- spark-core_2.12
+ spark-core_${scala.version}
2.4.0
-
- org.scalatest
- scalatest_2.12
- 3.0.1
- test
-
+
junit
junit
4.12
-
+
com.thoughtworks.paranamer
paranamer
diff --git a/code/spark/spark-core/src/main/java/rdd/scala/TransformationTest.scala b/code/spark/spark-core/src/main/java/rdd/scala/TransformationTest.scala
index 8fcd2a1..1229dae 100644
--- a/code/spark/spark-core/src/main/java/rdd/scala/TransformationTest.scala
+++ b/code/spark/spark-core/src/main/java/rdd/scala/TransformationTest.scala
@@ -3,6 +3,8 @@ package rdd.scala
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.{After, Test}
+import scala.collection.mutable.ListBuffer
+
class TransformationTest {
val conf: SparkConf = new SparkConf().setAppName("TransformationTest").setMaster("local[2]")
@@ -11,10 +13,185 @@ class TransformationTest {
@Test
def map(): Unit = {
- val list = List(3, 6, 9, 10, 12, 21)
+ val list = List(1, 2, 3)
sc.parallelize(list).map(_ * 10).foreach(println)
}
+
+ @Test
+ def filter(): Unit = {
+ val list = List(3, 6, 9, 10, 12, 21)
+ sc.parallelize(list).filter(_ >= 10).foreach(println)
+ }
+
+
+ @Test
+ def flatMap(): Unit = {
+ val list = List(List(1, 2), List(3), List(), List(4, 5))
+ sc.parallelize(list).flatMap(_.toList).map(_ * 10).foreach(println)
+
+ val lines = List("spark flume spark",
+ "hadoop flume hive")
+ sc.parallelize(lines).flatMap(line => line.split(" ")).
+ map(word => (word, 1)).reduceByKey(_ + _).foreach(println)
+
+ }
+
+
+ @Test
+ def mapPartitions(): Unit = {
+ val list = List(1, 2, 3, 4, 5, 6)
+ sc.parallelize(list, 3).mapPartitions(iterator => {
+ val buffer = new ListBuffer[Int]
+ while (iterator.hasNext) {
+ buffer.append(iterator.next() * 100)
+ }
+ buffer.toIterator
+ }).foreach(println)
+ }
+
+
+ @Test
+ def mapPartitionsWithIndex(): Unit = {
+ val list = List(1, 2, 3, 4, 5, 6)
+ sc.parallelize(list, 3).mapPartitionsWithIndex((index, iterator) => {
+ val buffer = new ListBuffer[String]
+ while (iterator.hasNext) {
+ buffer.append(index + "分区:" + iterator.next() * 100)
+ }
+ buffer.toIterator
+ }).foreach(println)
+ }
+
+
+ @Test
+ def sample(): Unit = {
+ val list = List(1, 2, 3, 4, 5, 6)
+ sc.parallelize(list).sample(withReplacement = false, 0.5).foreach(println)
+ }
+
+
+ @Test
+ def union(): Unit = {
+ val list1 = List(1, 2, 3)
+ val list2 = List(4, 5, 6)
+ sc.parallelize(list1).union(sc.parallelize(list2)).foreach(println)
+ }
+
+
+ @Test
+ def intersection(): Unit = {
+ val list1 = List(1, 2, 3, 4, 5)
+ val list2 = List(4, 5, 6)
+ sc.parallelize(list1).intersection(sc.parallelize(list2)).foreach(println)
+ }
+
+ @Test
+ def distinct(): Unit = {
+ val list = List(1, 2, 2, 4, 4)
+ sc.parallelize(list).distinct().foreach(println)
+ }
+
+
+ @Test
+ def groupByKey(): Unit = {
+ val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
+ sc.parallelize(list).groupByKey().map(x => (x._1, x._2.toList)).foreach(println)
+ }
+
+
+ @Test
+ def reduceByKey(): Unit = {
+ val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
+ sc.parallelize(list).reduceByKey(_ + _).foreach(println)
+ }
+
+ @Test
+ def aggregateByKey(): Unit = {
+ val list = List(("hadoop", 3), ("hadoop", 2), ("spark", 4), ("spark", 3), ("storm", 6), ("storm", 8))
+ sc.parallelize(list, numSlices = 6).aggregateByKey(zeroValue = 0, numPartitions = 5)(
+ seqOp = math.max(_, _),
+ combOp = _ + _
+ ).getNumPartitions
+ }
+
+
+ @Test
+ def sortBy(): Unit = {
+ val list01 = List((100, "hadoop"), (90, "spark"), (120, "storm"))
+ sc.parallelize(list01).sortByKey(ascending = false).foreach(println)
+
+ val list02 = List(("hadoop", 100), ("spark", 90), ("storm", 120))
+ sc.parallelize(list02).sortBy(x => x._2, ascending = false).foreach(println)
+ }
+
+
+ @Test
+ def join(): Unit = {
+ val list01 = List((1, "student01"), (2, "student02"), (3, "student03"))
+ val list02 = List((1, "teacher01"), (2, "teacher02"), (3, "teacher03"))
+ sc.parallelize(list01).join(sc.parallelize(list02)).foreach(println)
+ }
+
+
+ @Test
+ def cogroup(): Unit = {
+ val list01 = List((1, "a"), (1, "a"), (2, "b"), (3, "e"))
+ val list02 = List((1, "A"), (2, "B"), (3, "E"))
+ val list03 = List((1, "[ab]"), (2, "[bB]"), (3, "eE"), (3, "eE"))
+ sc.parallelize(list01).cogroup(sc.parallelize(list02), sc.parallelize(list03)).foreach(println)
+ }
+
+
+ @Test
+ def cartesian(): Unit = {
+ val list1 = List("A", "B", "C")
+ val list2 = List(1, 2, 3)
+ sc.parallelize(list1).cartesian(sc.parallelize(list2)).foreach(println)
+ }
+
+
+ @Test
+ def reduce(): Unit = {
+ val list = List(1, 2, 3, 4, 5)
+ sc.parallelize(list).reduce((x, y) => x + y)
+ sc.parallelize(list).reduce(_ + _)
+ }
+
+ // 继承Ordering[T],实现自定义比较器
+ class CustomOrdering extends Ordering[(Int, String)] {
+ override def compare(x: (Int, String), y: (Int, String)): Int
+ = if (x._2.length > y._2.length) 1 else -1
+ }
+
+ @Test
+ def takeOrdered(): Unit = {
+ val list = List((1, "hadoop"), (1, "storm"), (1, "azkaban"), (1, "hive"))
+ // 定义隐式默认值
+ implicit val implicitOrdering = new CustomOrdering
+ sc.parallelize(list).takeOrdered(5)
+ }
+
+
+ @Test
+ def countByKey(): Unit = {
+ val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
+ sc.parallelize(list).countByKey()
+ }
+
+ @Test
+ def saveAsTextFile(): Unit = {
+ val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
+ sc.parallelize(list).saveAsTextFile("/usr/file/temp")
+ }
+
+ @Test
+ def saveAsSequenceFile(): Unit = {
+ val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
+ sc.parallelize(list).saveAsSequenceFile("/usr/file/sequence")
+ }
+
+
@After
def destroy(): Unit = {
sc.stop()
diff --git a/notes/Spark-Transformation和Action.md b/notes/Spark-Transformation和Action.md
index e69de29..5ea245a 100644
--- a/notes/Spark-Transformation和Action.md
+++ b/notes/Spark-Transformation和Action.md
@@ -0,0 +1,420 @@
+# Transformation 和 Action 常用算子
+
+
+
+## 一、Transformation
+
+下表为spark官网给出的常用的Transformation算子:
+
+| Transformation算子 | Meaning(含义) |
+| ------------------------------------------------------------ | ------------------------------------------------------------ |
+| **map**(*func*) | 对原RDD中每个元素运用 *func* 函数,并生成新的RDD |
+| **filter**(*func*) | 对原RDD中每个元素使用*func* 函数进行过滤,并生成新的RDD |
+| **flatMap**(*func*) | 与 map 类似,但是每一个输入的 item 被映射成 0 个或多个输出的 items( *func* 返回类型需要为 Seq )。 |
+| **mapPartitions**(*func*) | 与 map 类似,但函数单独在RDD的每个分区上运行, *func*函数的类型为 Iterator\ => Iterator\ ,其中T是RDD的类型,即RDD[T] |
+| **mapPartitionsWithIndex**(*func*) | 与 mapPartitions 类似,但 *func* 类型为 (Int, Iterator\) => Iterator\ ,其中第一个参数为分区索引 |
+| **sample**(*withReplacement*, *fraction*, *seed*) | 数据采样,有三个可选参数:设置是否放回(withReplacement)、采样的百分比(*fraction*)、随机数生成器的种子(seed); |
+| **union**(*otherDataset*) | 合并两个RDD |
+| **intersection**(*otherDataset*) | 求两个RDD的交集 |
+| **distinct**([*numTasks*])) | 去重 |
+| **groupByKey**([*numTasks*]) | 按照key值进行分区,即在一个 (K, V) 对的 dataset 上调用时,返回一个 (K, Iterable\)
**Note:** 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 `reduceByKey` 或 `aggregateByKey` 性能会更好
**Note:** 默认情况下,并行度取决于父 RDD 的分区数。可以传入 `numTasks` 参数进行修改。 |
+| **reduceByKey**(*func*, [*numTasks*]) | 按照key值进行分组,并对分组后的数据执行归约操作。 |
+| **aggregateByKey**(*zeroValue*,*numPartitions*)(*seqOp*, *combOp*, [*numTasks*]) | 当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中使用给定的组合函数和zeroValue聚合每个键的值。与groupByKey类似,reduce任务的数量可通过第二个参数进行配置。 |
+| **sortByKey**([*ascending*], [*numTasks*]) | 按照key进行排序,其中的key需要实现Ordered特质,即可比较 |
+| **join**(*otherDataset*, [*numTasks*]) | 在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,等价于内连接操作。如果想要执行外连接,可以使用`leftOuterJoin`, `rightOuterJoin` 和 `fullOuterJoin` 等算子。 |
+| **cogroup**(*otherDataset*, [*numTasks*]) | 在一个 (K, V) 对的 dataset 上调用时,返回一个 (K, (Iterable\, Iterable\)) tuples 的 dataset。 |
+| **cartesian**(*otherDataset*) | 在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) 类型的 dataset(即笛卡尔积)。 |
+| **coalesce**(*numPartitions*) | 将RDD中的分区数减少为numPartitions。 |
+| **repartition**(*numPartitions*) | 随机重新调整RDD中的数据以创建更多或更少的分区,并在它们之间进行平衡。 |
+| **repartitionAndSortWithinPartitions**(*partitioner*) | 根据给定的 partitioner(分区器)对 RDD 进行重新分区,并对分区中的数据按照 key 值进行排序。这比调用 `repartition` 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作所在的机器。 |
+
+下面分别给出这些算子的基本使用实例:
+
+### 1.1 map
+
+```scala
+val list = List(1,2,3)
+sc.parallelize(list).map(_ * 10).foreach(println)
+
+// 输出结果: 10 20 30 (这里为了节省篇幅去掉了换行,后文亦同)
+```
+
+### 1.2 filter
+
+```scala
+val list = List(3, 6, 9, 10, 12, 21)
+sc.parallelize(list).filter(_ >= 10).foreach(println)
+
+// 输出: 10 12 21
+```
+
+### 1.3 flatMap
+
+与 map 类似,但是每一个输入的 item 被映射成 0 个或多个输出的 items( *func* 返回类型需要为 Seq )。
+
+```scala
+val list = List(List(1, 2), List(3), List(), List(4, 5))
+sc.parallelize(list).flatMap(_.toList).map(_ * 10).foreach(println)
+
+// 输出结果 : 10 20 30 40 50
+```
+
+flatMap 这个算子在日志分析中使用概率非常高,这里进行一下演示:
+
+拆分输入的每行数据为单个单词,并赋值为1,代表出现一次,之后按照单词分组并统计其出现总次数,代码如下:
+
+```scala
+val lines = List("spark flume spark",
+ "hadoop flume hive")
+sc.parallelize(lines).flatMap(line => line.split(" ")).
+map(word=>(word,1)).reduceByKey(_+_).foreach(println)
+
+// 输出:
+(spark,2)
+(hive,1)
+(hadoop,1)
+(flume,2)
+```
+
+### 1.4 mapPartitions
+
+与 map 类似,但函数单独在RDD的每个分区上运行, *func*函数的类型为Iterator\ => Iterator\ (其中T是RDD的类型),即输入和输出都必须是可迭代类型。
+
+```scala
+val list = List(1, 2, 3, 4, 5, 6)
+sc.parallelize(list, 3).mapPartitions(iterator => {
+ val buffer = new ListBuffer[Int]
+ while (iterator.hasNext) {
+ buffer.append(iterator.next() * 100)
+ }
+ buffer.toIterator
+}).foreach(println)
+//输出结果
+100 200 300 400 500 600
+```
+
+### 1.5 mapPartitionsWithIndex
+
+ 与 mapPartitions 类似,但 *func* 类型为 (Int, Iterator\) => Iterator\ ,其中第一个参数为分区索引。
+
+```scala
+val list = List(1, 2, 3, 4, 5, 6)
+sc.parallelize(list, 3).mapPartitionsWithIndex((index, iterator) => {
+ val buffer = new ListBuffer[String]
+ while (iterator.hasNext) {
+ buffer.append(index + "分区:" + iterator.next() * 100)
+ }
+ buffer.toIterator
+}).foreach(println)
+//输出
+0分区:100
+0分区:200
+1分区:300
+1分区:400
+2分区:500
+2分区:600
+```
+
+### 1.6 sample
+
+ 数据采样,有三个可选参数:设置是否放回(withReplacement)、采样的百分比(*fraction*)、随机数生成器的种子(seed):
+
+```scala
+val list = List(1, 2, 3, 4, 5, 6)
+sc.parallelize(list).sample(withReplacement = false, 0.5).foreach(println)
+```
+
+### 1.7 union
+
+合并两个RDD:
+
+```scala
+val list1 = List(1, 2, 3)
+val list2 = List(4, 5, 6)
+sc.parallelize(list1).union(sc.parallelize(list2)).foreach(println)
+// 输出: 1 2 3 4 5 6
+```
+
+### 1.8 intersection
+
+求两个RDD的交集:
+
+```scala
+val list1 = List(1, 2, 3, 4, 5)
+val list2 = List(4, 5, 6)
+sc.parallelize(list1).intersection(sc.parallelize(list2)).foreach(println)
+// 输出: 4 5
+```
+
+### 1.9 distinct
+
+去重:
+
+```scala
+val list = List(1, 2, 2, 4, 4)
+sc.parallelize(list).distinct().foreach(println)
+// 输出: 4 1 2
+```
+
+### 1.10 groupByKey
+
+按照键进行分组:
+
+```scala
+val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
+sc.parallelize(list).groupByKey().map(x => (x._1, x._2.toList)).foreach(println)
+
+//输出:
+(spark,List(3, 5))
+(hadoop,List(2, 2))
+(storm,List(6))
+```
+
+### 1.11 reduceByKey
+
+按照键进行归约操作:
+
+```scala
+val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
+sc.parallelize(list).reduceByKey(_ + _).foreach(println)
+
+//输出
+(spark,8)
+(hadoop,4)
+(storm,6)
+```
+
+### 1.12 sortBy & sortByKey
+
+按照键进行排序:
+
+```scala
+val list01 = List((100, "hadoop"), (90, "spark"), (120, "storm"))
+sc.parallelize(list01).sortByKey(ascending = false).foreach(println)
+// 输出
+(120,storm)
+(90,spark)
+(100,hadoop)
+```
+
+按照指定元素进行排序:
+
+```scala
+val list02 = List(("hadoop",100), ("spark",90), ("storm",120))
+sc.parallelize(list02).sortBy(x=>x._2,ascending=false).foreach(println)
+// 输出
+(storm,120)
+(hadoop,100)
+(spark,90)
+```
+
+### 1.13 join
+
+在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,等价于内连接操作。如果想要执行外连接,可以使用`leftOuterJoin`, `rightOuterJoin` 和 `fullOuterJoin` 等算子。
+
+```scala
+val list01 = List((1, "student01"), (2, "student02"), (3, "student03"))
+val list02 = List((1, "teacher01"), (2, "teacher02"), (3, "teacher03"))
+sc.parallelize(list01).join(sc.parallelize(list02)).foreach(println)
+
+// 输出
+(1,(student01,teacher01))
+(3,(student03,teacher03))
+(2,(student02,teacher02))
+```
+
+### 1.14 cogroup
+
+在一个 (K, V) 对的 dataset 上调用时,返回多个类型为 (K, (Iterable\, Iterable\))的元组所组成的dataset。
+
+```scala
+val list01 = List((1, "a"),(1, "a"), (2, "b"), (3, "e"))
+val list02 = List((1, "A"), (2, "B"), (3, "E"))
+val list03 = List((1, "[ab]"), (2, "[bB]"), (3, "eE"),(3, "eE"))
+sc.parallelize(list01).cogroup(sc.parallelize(list02),sc.parallelize(list03)).foreach(println)
+
+// 输出: 同一个RDD中的元素先按照key进行分组,然后再对不同RDD中的元素按照key进行分组
+(1,(CompactBuffer(a, a),CompactBuffer(A),CompactBuffer([ab])))
+(3,(CompactBuffer(e),CompactBuffer(E),CompactBuffer(eE, eE)))
+(2,(CompactBuffer(b),CompactBuffer(B),CompactBuffer([bB])))
+
+```
+
+### 1.15 cartesian
+
+计算笛卡尔积:
+
+```scala
+val list1 = List("A", "B", "C")
+val list2 = List(1, 2, 3)
+sc.parallelize(list1).cartesian(sc.parallelize(list2)).foreach(println)
+
+//输出笛卡尔积
+(A,1)
+(A,2)
+(A,3)
+(B,1)
+(B,2)
+(B,3)
+(C,1)
+(C,2)
+(C,3)
+```
+
+### 1.16 aggregateByKey
+
+当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中使用给定的组合函数和zeroValue聚合每个键的值。与groupByKey类似,reduce任务的数量可通过第二个参数`numPartitions`进行配置。示例如下:
+
+```scala
+// 为了清晰,以下所有参数均使用具名传参
+val list = List(("hadoop", 3), ("hadoop", 2), ("spark", 4), ("spark", 3), ("storm", 6), ("storm", 8))
+sc.parallelize(list,numSlices = 2).aggregateByKey(zeroValue = 0,numPartitions = 3)(
+ seqOp = math.max(_, _),
+ combOp = _ + _
+ ).collect.foreach(println)
+//输出结果:
+(hadoop,3)
+(storm,8)
+(spark,7)
+```
+
+这里使用了`numSlices = 2`指定aggregateByKey父操作parallelize的分区数量为2,其执行流程如下:
+
+
+
+基于同样的执行流程,如果`numSlices = 1`,则意味着只有输入一个分区,则其最后一步combOp相当于是无效的,执行结果为:
+
+```properties
+(hadoop,3)
+(storm,8)
+(spark,4)
+```
+
+同样的,如果每个单词对一个分区,即`numSlices = 6`,此时相当于求和操作,执行结果为:
+
+```properties
+(hadoop,5)
+(storm,14)
+(spark,7)
+```
+
+最后一个问题是`aggregateByKey(zeroValue = 0,numPartitions = 3)`的第二个参数`numPartitions `决定的是什么?实际上这个参数决定的是输出RDD的分区数量,想要验证这个问题,可以对上面代码进行改写,使用`getNumPartitions`方法获取分区数量:
+
+```scala
+sc.parallelize(list,numSlices = 6).aggregateByKey(zeroValue = 0,numPartitions = 3)(
+ seqOp = math.max(_, _),
+ combOp = _ + _
+).getNumPartitions
+```
+
+
+
+## 二、Action
+
+下表为spark官网给出的常用的Action算子:
+
+| Action(动作) | Meaning(含义) |
+| -------------------------------------------------- | ------------------------------------------------------------ |
+| **reduce**(*func*) | 使用函数*func*执行归约操作 |
+| **collect**() | 以一个 array 数组的形式返回 dataset 的所有元素,适用于小结果集。 |
+| **count**() | 返回 dataset 中元素的个数。 |
+| **first**() | 返回 dataset 中的第一个元素,等价于 take(1)。 |
+| **take**(*n*) | 将数据集中的前 *n* 个元素作为一个 array 数组返回。 |
+| **takeSample**(*withReplacement*, *num*, [*seed*]) | 对一个 dataset 进行随机抽样 |
+| **takeOrdered**(*n*, *[ordering]*) | 按自然顺序(natural order)或自定义比较器(custom comparator)排序后返回前 *n* 个元素。只适用于小结果集,因为所有数据都会被加载到驱动程序的内存中进行排序。 |
+| **saveAsTextFile**(*path*) | 将 dataset 中的元素以文本文件的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。Spark 将对每个元素调用 toString 方法,将元素转换为文本文件中的一行记录。 |
+| **saveAsSequenceFile**(*path*) | 将 dataset 中的元素以Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。该操作要求RDD中的元素需要实现 Hadoop 的 Writable 接口。对于Scala语言而言,它可以将Spark中的基本数据类型自动隐式转换为对应Writable类型。(目前仅支持Java and Scala) |
+| **saveAsObjectFile**(*path*) | 使用 Java 序列化后存储,可以使用 `SparkContext.objectFile()` 进行加载。(目前仅支持Java and Scala) |
+| **countByKey**() | 计算每个键出现的次数。 |
+| **foreach**(*func*) | 遍历RDD中每个元素,并对其执行*fun*函数 |
+
+### 2.1 reduce
+
+使用函数*func*执行归约操作:
+
+```scala
+ val list = List(1, 2, 3, 4, 5)
+sc.parallelize(list).reduce((x, y) => x + y)
+sc.parallelize(list).reduce(_ + _)
+
+// 输出 15
+```
+
+### 2.2 takeOrdered
+
+按自然顺序(natural order)或自定义比较器(custom comparator)排序后返回前 *n* 个元素。需要注意的是`takeOrdered`使用隐式参数进行隐式转换,以下为其源码。所以在使用自定义排序时,需要继承Ordering[T]实现自定义比较器,然后将其作为隐式参数引入。
+
+```scala
+def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
+ .........
+}
+```
+
+自定义规则排序:
+
+```scala
+// 继承Ordering[T],实现自定义比较器,按照value值的长度进行排序
+class CustomOrdering extends Ordering[(Int, String)] {
+ override def compare(x: (Int, String), y: (Int, String)): Int
+ = if (x._2.length > y._2.length) 1 else -1
+}
+
+val list = List((1, "hadoop"), (1, "storm"), (1, "azkaban"), (1, "hive"))
+// 引入隐式默认值
+implicit val implicitOrdering = new CustomOrdering
+sc.parallelize(list).takeOrdered(5)
+
+// 输出: Array((1,hive), (1,storm), (1,hadoop), (1,azkaban)
+```
+
+### 2.3 countByKey
+
+计算每个键出现的次数:
+
+```scala
+val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
+sc.parallelize(list).countByKey()
+
+// 输出: Map(hadoop -> 2, storm -> 2, azkaban -> 1)
+```
+
+### 2.4 saveAsTextFile
+
+将 dataset 中的元素以文本文件的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。Spark 将对每个元素调用 toString 方法,将元素转换为文本文件中的一行记录。
+
+```scala
+val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
+sc.parallelize(list).saveAsTextFile("/usr/file/temp")
+```
+
+
+
+
+
+## 参考资料
+
+[RDD Programming Guide](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-programming-guide)
+
diff --git a/pictures/spark-aggregateByKey.png b/pictures/spark-aggregateByKey.png
new file mode 100644
index 0000000..7888b4a
Binary files /dev/null and b/pictures/spark-aggregateByKey.png differ
diff --git a/pictures/spark-getpartnum.png b/pictures/spark-getpartnum.png
new file mode 100644
index 0000000..f1c352e
Binary files /dev/null and b/pictures/spark-getpartnum.png differ