Spark_Transformation和Action算子
This commit is contained in:
parent
27a688b69a
commit
ee184710cf
@ -27,7 +27,7 @@
|
|||||||
|
|
||||||
## 一、Transformation
|
## 一、Transformation
|
||||||
|
|
||||||
下表为spark官网给出的常用的Transformation算子:
|
spark常用的Transformation算子如下表:
|
||||||
|
|
||||||
| Transformation算子 | Meaning(含义) |
|
| Transformation算子 | Meaning(含义) |
|
||||||
| ------------------------------------------------------------ | ------------------------------------------------------------ |
|
| ------------------------------------------------------------ | ------------------------------------------------------------ |
|
||||||
@ -51,7 +51,7 @@
|
|||||||
| **repartition**(*numPartitions*) | 随机重新调整RDD中的数据以创建更多或更少的分区,并在它们之间进行平衡。 |
|
| **repartition**(*numPartitions*) | 随机重新调整RDD中的数据以创建更多或更少的分区,并在它们之间进行平衡。 |
|
||||||
| **repartitionAndSortWithinPartitions**(*partitioner*) | 根据给定的 partitioner(分区器)对 RDD 进行重新分区,并对分区中的数据按照 key 值进行排序。这比调用 `repartition` 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作所在的机器。 |
|
| **repartitionAndSortWithinPartitions**(*partitioner*) | 根据给定的 partitioner(分区器)对 RDD 进行重新分区,并对分区中的数据按照 key 值进行排序。这比调用 `repartition` 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作所在的机器。 |
|
||||||
|
|
||||||
下面分别给出这些算子的基本使用实例:
|
下面分别给出这些算子的基本使用示例:
|
||||||
|
|
||||||
### 1.1 map
|
### 1.1 map
|
||||||
|
|
||||||
@ -73,7 +73,7 @@ sc.parallelize(list).filter(_ >= 10).foreach(println)
|
|||||||
|
|
||||||
### 1.3 flatMap
|
### 1.3 flatMap
|
||||||
|
|
||||||
与 map 类似,但是每一个输入的 item 被映射成 0 个或多个输出的 items( *func* 返回类型需要为 Seq )。
|
`flatMap(func)`与`map`类似,但每一个输入的 item 会被映射成 0 个或多个输出的 items( *func* 返回类型需要为`Seq`类型)。
|
||||||
|
|
||||||
```scala
|
```scala
|
||||||
val list = List(List(1, 2), List(3), List(), List(4, 5))
|
val list = List(List(1, 2), List(3), List(), List(4, 5))
|
||||||
@ -82,9 +82,7 @@ sc.parallelize(list).flatMap(_.toList).map(_ * 10).foreach(println)
|
|||||||
// 输出结果 : 10 20 30 40 50
|
// 输出结果 : 10 20 30 40 50
|
||||||
```
|
```
|
||||||
|
|
||||||
flatMap 这个算子在日志分析中使用概率非常高,这里进行一下演示:
|
flatMap 这个算子在日志分析中使用概率非常高,这里进行一下演示:拆分输入的每行数据为单个单词,并赋值为1,代表出现一次,之后按照单词分组并统计其出现总次数,代码如下:
|
||||||
|
|
||||||
拆分输入的每行数据为单个单词,并赋值为1,代表出现一次,之后按照单词分组并统计其出现总次数,代码如下:
|
|
||||||
|
|
||||||
```scala
|
```scala
|
||||||
val lines = List("spark flume spark",
|
val lines = List("spark flume spark",
|
||||||
@ -101,7 +99,7 @@ map(word=>(word,1)).reduceByKey(_+_).foreach(println)
|
|||||||
|
|
||||||
### 1.4 mapPartitions
|
### 1.4 mapPartitions
|
||||||
|
|
||||||
与 map 类似,但函数单独在RDD的每个分区上运行, *func*函数的类型为Iterator\<T> => Iterator\<U> (其中T是RDD的类型),即输入和输出都必须是可迭代类型。
|
与 map 类似,但函数单独在RDD的每个分区上运行, *func*函数的类型为`Iterator<T> => Iterator<U>` (其中T是RDD的类型),即输入和输出都必须是可迭代类型。
|
||||||
|
|
||||||
```scala
|
```scala
|
||||||
val list = List(1, 2, 3, 4, 5, 6)
|
val list = List(1, 2, 3, 4, 5, 6)
|
||||||
@ -118,7 +116,7 @@ sc.parallelize(list, 3).mapPartitions(iterator => {
|
|||||||
|
|
||||||
### 1.5 mapPartitionsWithIndex
|
### 1.5 mapPartitionsWithIndex
|
||||||
|
|
||||||
与 mapPartitions 类似,但 *func* 类型为 (Int, Iterator\<T>) => Iterator\<U> ,其中第一个参数为分区索引。
|
与 mapPartitions 类似,但 *func* 类型为`(Int, Iterator<T>) => Iterator<U>` ,其中第一个参数为分区索引。
|
||||||
|
|
||||||
```scala
|
```scala
|
||||||
val list = List(1, 2, 3, 4, 5, 6)
|
val list = List(1, 2, 3, 4, 5, 6)
|
||||||
@ -140,7 +138,7 @@ sc.parallelize(list, 3).mapPartitionsWithIndex((index, iterator) => {
|
|||||||
|
|
||||||
### 1.6 sample
|
### 1.6 sample
|
||||||
|
|
||||||
数据采样,有三个可选参数:设置是否放回(withReplacement)、采样的百分比(*fraction*)、随机数生成器的种子(seed):
|
数据采样。有三个可选参数:设置是否放回(withReplacement)、采样的百分比(fraction)、随机数生成器的种子(seed) :
|
||||||
|
|
||||||
```scala
|
```scala
|
||||||
val list = List(1, 2, 3, 4, 5, 6)
|
val list = List(1, 2, 3, 4, 5, 6)
|
||||||
@ -233,7 +231,7 @@ sc.parallelize(list02).sortBy(x=>x._2,ascending=false).foreach(println)
|
|||||||
|
|
||||||
### 1.13 join
|
### 1.13 join
|
||||||
|
|
||||||
在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,等价于内连接操作。如果想要执行外连接,可以使用`leftOuterJoin`, `rightOuterJoin` 和 `fullOuterJoin` 等算子。
|
在一个 (K, V) 和 (K, W) 类型的 Dataset 上调用时,返回一个 (K, (V, W)) 的 Dataset,等价于内连接操作。如果想要执行外连接,可以使用`leftOuterJoin`, `rightOuterJoin` 和 `fullOuterJoin` 等算子。
|
||||||
|
|
||||||
```scala
|
```scala
|
||||||
val list01 = List((1, "student01"), (2, "student02"), (3, "student03"))
|
val list01 = List((1, "student01"), (2, "student02"), (3, "student03"))
|
||||||
@ -248,7 +246,7 @@ sc.parallelize(list01).join(sc.parallelize(list02)).foreach(println)
|
|||||||
|
|
||||||
### 1.14 cogroup
|
### 1.14 cogroup
|
||||||
|
|
||||||
在一个 (K, V) 对的 dataset 上调用时,返回多个类型为 (K, (Iterable\<V>, Iterable\<W>))的元组所组成的dataset。
|
在一个 (K, V) 对的 Dataset 上调用时,返回多个类型为 (K, (Iterable\<V>, Iterable\<W>)) 的元组所组成的Dataset。
|
||||||
|
|
||||||
```scala
|
```scala
|
||||||
val list01 = List((1, "a"),(1, "a"), (2, "b"), (3, "e"))
|
val list01 = List((1, "a"),(1, "a"), (2, "b"), (3, "e"))
|
||||||
@ -286,7 +284,7 @@ sc.parallelize(list1).cartesian(sc.parallelize(list2)).foreach(println)
|
|||||||
|
|
||||||
### 1.16 aggregateByKey
|
### 1.16 aggregateByKey
|
||||||
|
|
||||||
当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中使用给定的组合函数和zeroValue聚合每个键的值。与groupByKey类似,reduce任务的数量可通过第二个参数`numPartitions`进行配置。示例如下:
|
当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中使用给定的组合函数和zeroValue聚合每个键的值。与`groupByKey`类似,reduce任务的数量可通过第二个参数`numPartitions`进行配置。示例如下:
|
||||||
|
|
||||||
```scala
|
```scala
|
||||||
// 为了清晰,以下所有参数均使用具名传参
|
// 为了清晰,以下所有参数均使用具名传参
|
||||||
@ -321,7 +319,7 @@ sc.parallelize(list,numSlices = 2).aggregateByKey(zeroValue = 0,numPartitions =
|
|||||||
(spark,7)
|
(spark,7)
|
||||||
```
|
```
|
||||||
|
|
||||||
最后一个问题是`aggregateByKey(zeroValue = 0,numPartitions = 3)`的第二个参数`numPartitions `决定的是什么?实际上这个参数决定的是输出RDD的分区数量,想要验证这个问题,可以对上面代码进行改写,使用`getNumPartitions`方法获取分区数量:
|
`aggregateByKey(zeroValue = 0,numPartitions = 3)`的第二个参数`numPartitions `决定的是输出RDD的分区数量,想要验证这个问题,可以对上面代码进行改写,使用`getNumPartitions`方法获取分区数量:
|
||||||
|
|
||||||
```scala
|
```scala
|
||||||
sc.parallelize(list,numSlices = 6).aggregateByKey(zeroValue = 0,numPartitions = 3)(
|
sc.parallelize(list,numSlices = 6).aggregateByKey(zeroValue = 0,numPartitions = 3)(
|
||||||
@ -334,7 +332,7 @@ sc.parallelize(list,numSlices = 6).aggregateByKey(zeroValue = 0,numPartitions =
|
|||||||
|
|
||||||
## 二、Action
|
## 二、Action
|
||||||
|
|
||||||
下表为spark官网给出的常用的Action算子:
|
Spark常用的Action算子如下:
|
||||||
|
|
||||||
| Action(动作) | Meaning(含义) |
|
| Action(动作) | Meaning(含义) |
|
||||||
| -------------------------------------------------- | ------------------------------------------------------------ |
|
| -------------------------------------------------- | ------------------------------------------------------------ |
|
||||||
@ -365,7 +363,7 @@ sc.parallelize(list).reduce(_ + _)
|
|||||||
|
|
||||||
### 2.2 takeOrdered
|
### 2.2 takeOrdered
|
||||||
|
|
||||||
按自然顺序(natural order)或自定义比较器(custom comparator)排序后返回前 *n* 个元素。需要注意的是`takeOrdered`使用隐式参数进行隐式转换,以下为其源码。所以在使用自定义排序时,需要继承Ordering[T]实现自定义比较器,然后将其作为隐式参数引入。
|
按自然顺序(natural order)或自定义比较器(custom comparator)排序后返回前 *n* 个元素。需要注意的是`takeOrdered`使用隐式参数进行隐式转换,以下为其源码。所以在使用自定义排序时,需要继承`Ordering[T]`实现自定义比较器,然后将其作为隐式参数引入。
|
||||||
|
|
||||||
```scala
|
```scala
|
||||||
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
|
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user