diff --git a/README.md b/README.md index 68f68ca..eaecb01 100644 --- a/README.md +++ b/README.md @@ -74,11 +74,11 @@ 1. Spark简介 2. [Spark单机版本环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Spark%E5%8D%95%E6%9C%BA%E7%89%88%E6%9C%AC%E7%8E%AF%E5%A2%83%E6%90%AD%E5%BB%BA.md) -3. 弹性式数据集RDD -4. Spark命令行的基本使用 -5. RDD常用算子之——Transformation Action -6. RDD常用算子之——Action -7. Spark广播变量与累加器 +3. Spark命令行的基本使用 +4. [弹性式数据集RDD](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark-RDD.md) +5. [RDD常用算子详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark-Transformation和Action.md) +6. Spark运行模式 +7. [Spark累加器与广播变量](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark累加器与广播变量.md) **Spark SQL :** diff --git a/notes/Spark累加器与广播变量.md b/notes/Spark累加器与广播变量.md new file mode 100644 index 0000000..e07094b --- /dev/null +++ b/notes/Spark累加器与广播变量.md @@ -0,0 +1,104 @@ +# Spark 累加器与广播变量 + + +## 一、简介 + +在Spark中,提供了两种类型的共享变量:累加器(accumulator)与广播变量(broadcast variable)。 + ++ 累加器:用来对信息进行聚合,主要用于累计计数等场景; ++ 广播变量:主要用于在节点间高效分发大对象。 + +## 二、累加器 + +这里先看一个具体的场景,对于正常的累计求和,如果在集群模式中使用下面的代码进行计算,会发现执行结果并非预期: + +```scala +var counter = 0 +val data = Array(1, 2, 3, 4, 5) +sc.parallelize(data).foreach(x => counter += x) + println(counter) +``` + +counter最后的结果是0,导致这个问题的主要原因是闭包。 + +
+ + + +### 2.1 理解闭包 + +**1. Scala中闭包的概念** + +这里先介绍一下Scala中关于闭包的概念: + +``` +var more = 10 +val addMore = (x: Int) => x + more +``` + +如上函数`addMore`中有两个变量x和more: + +- **x** : 是一个绑定变量(bound variable),因为其是该函数的入参,在函数的上下文中有明确的定义; +- **more** : 是一个自由变量(free variable),因为函数字面量本生并没有给more赋予任何含义。 + +按照定义:在创建函数时,如果需要捕获自由变量,那么包含指向被捕获变量的引用的函数就被称为闭包函数。 + +**2. Spark中的闭包** + +在实际计算时,Spark会将对RDD操作分解为Task,Task运行在Worker Noode上。在执行之前,Spark会对任务进行闭包,如果闭包内涉及到自由变量,则程序会进行拷贝,并将副本变量放在闭包中,之后闭包被序列化并发送给每个执行者。因此,当在foreach函数中引用`counter`时,它将不再是Driver节点上的`counter`,而是闭包中的副本`counter`,默认情况下,副本`counter`更新后的值不会回传到Driver,所以计数器的最终值仍然为零。 + +需要注意的是:在Local模式下,**有可能**执行foreach的Worker Node与Diver处在相同的JVM,并引用相同的原始`counter`,这时候更新可能是正确的,但是在集群模式下却不行。所以在遇到此类问题时应优先使用累加器。 + +累加器的原理实际上很简单:就是将每个副本变量的最终值传回Driver,由Driver聚合后得到最终值,并更新原始变量。 + + +
+ +### 2.2 使用累加器 + +`SparkContext`中定义了所有创建累加器的方法,需要注意的是:被中横线划掉的累加器方法在Spark 2.0.0之后被标识为废弃。 + +
+ +使用示例和执行结果分别如下: + +```scala +val data = Array(1, 2, 3, 4, 5) +// 定义累加器 +val accum = sc.longAccumulator("My Accumulator") +sc.parallelize(data).foreach(x => accum.add(x)) +// 获取累加器的值 +accum.value +``` + +
+ + + +## 三、广播变量 + +在上面介绍中闭包的过程中我们说道每个Task任务的闭包都会持有自由变量的副本,如果变量很大且Task任务很多的情况下,这必然会对网络IO造成压力,为了解决这个情况,Spark提供了广播变量。 + +广播变量的做法很简单:就是不把副本变量分发到每个Task中,而是将其分发到每个Executor,Executor中的所有Task共享一个副本变量。 + +```scala +// 把一个数组定义为一个广播变量 +val broadcastVar = sc.broadcast(Array(1, 2, 3, 4, 5)) +// 之后用到该数组时应优先使用广播变量,而不是原值 +sc.parallelize(broadcastVar.value).map(_ * 10).collect() +``` + + + + + +## 参考资料 + +[RDD Programming Guide](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-programming-guide) + diff --git a/pictures/spark-累加器1.png b/pictures/spark-累加器1.png new file mode 100644 index 0000000..d612fc8 Binary files /dev/null and b/pictures/spark-累加器1.png differ diff --git a/pictures/spark-累加器2.png b/pictures/spark-累加器2.png new file mode 100644 index 0000000..b5b9d9d Binary files /dev/null and b/pictures/spark-累加器2.png differ diff --git a/pictures/spark-累加器方法.png b/pictures/spark-累加器方法.png new file mode 100644 index 0000000..3e6dd74 Binary files /dev/null and b/pictures/spark-累加器方法.png differ diff --git a/pictures/spark-集群模式.png b/pictures/spark-集群模式.png new file mode 100644 index 0000000..b1b7c1a Binary files /dev/null and b/pictures/spark-集群模式.png differ