BigData-Notes/notes/Spark累加器与广播变量.md
2019-05-17 15:21:20 +08:00

106 lines
4.5 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Spark 累加器与广播变量
<nav>
<a href="#一简介">一、简介</a><br/>
<a href="#二累加器">二、累加器</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#21-理解闭包">2.1 理解闭包</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#22-使用累加器">2.2 使用累加器</a><br/>
<a href="#三广播变量">三、广播变量</a><br/>
</nav>
## 一、简介
在Spark中提供了两种类型的共享变量累加器(accumulator)与广播变量(broadcast variable)
+ 累加器:用来对信息进行聚合,主要用于累计计数等场景;
+ 广播变量:主要用于在节点间高效分发大对象。
## 二、累加器
这里先看一个具体的场景,对于正常的累计求和,如果在集群模式中使用下面的代码进行计算,会发现执行结果并非预期:
```scala
var counter = 0
val data = Array(1, 2, 3, 4, 5)
sc.parallelize(data).foreach(x => counter += x)
println(counter)
```
counter最后的结果是0导致这个问题的主要原因是闭包。
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-累加器1.png"/> </div>
### 2.1 理解闭包
**1. Scala中闭包的概念**
这里先介绍一下Scala中关于闭包的概念
```
var more = 10
val addMore = (x: Int) => x + more
```
如上函数`addMore`中有两个变量x和more:
- **x** : 是一个绑定变量(bound variable),因为其是该函数的入参,在函数的上下文中有明确的定义;
- **more** : 是一个自由变量(free variable)因为函数字面量本生并没有给more赋予任何含义。
按照定义:在创建函数时,如果需要捕获自由变量,那么包含指向被捕获变量的引用的函数就被称为闭包函数。
**2. Spark中的闭包**
在实际计算时Spark会将对RDD操作分解为TaskTask运行在Worker Noode上。在执行之前Spark会对任务进行闭包如果闭包内涉及到自由变量则程序会进行拷贝并将副本变量放在闭包中之后闭包被序列化并发送给每个执行者。因此当在foreach函数中引用`counter`它将不再是Driver节点上的`counter`,而是闭包中的副本`counter`,默认情况下,副本`counter`更新后的值不会回传到Driver所以计数器的最终值仍然为零。
需要注意的是在Local模式下**有可能**执行foreach的Worker Node与Diver处在相同的JVM并引用相同的原始`counter`,这时候更新可能是正确的,但是在集群模式下却不行。所以在遇到此类问题时应优先使用累加器。
累加器的原理实际上很简单就是将每个副本变量的最终值传回Driver由Driver聚合后得到最终值并更新原始变量。
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-集群模式.png"/> </div>
### 2.2 使用累加器
`SparkContext`中定义了所有创建累加器的方法需要注意的是被中横线划掉的累加器方法在Spark 2.0.0之后被标识为废弃。
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-累加器方法.png"/> </div>
使用示例和执行结果分别如下:
```scala
val data = Array(1, 2, 3, 4, 5)
// 定义累加器
val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(data).foreach(x => accum.add(x))
// 获取累加器的值
accum.value
```
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-累加器2.png"/> </div>
## 三、广播变量
在上面介绍中闭包的过程中我们说道每个Task任务的闭包都会持有自由变量的副本如果变量很大且Task任务很多的情况下这必然会对网络IO造成压力为了解决这个情况Spark提供了广播变量。
广播变量的做法很简单就是不把副本变量分发到每个Task中而是将其分发到每个ExecutorExecutor中的所有Task共享一个副本变量。
```scala
// 把一个数组定义为一个广播变量
val broadcastVar = sc.broadcast(Array(1, 2, 3, 4, 5))
// 之后用到该数组时应优先使用广播变量,而不是原值
sc.parallelize(broadcastVar.value).map(_ * 10).collect()
```
## 参考资料
[RDD Programming Guide](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-programming-guide)