BigData-Notes/notes/SparkSQL_Dataset和DataFrame简介.md

148 lines
9.5 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# DataFrame和Dataset简介
<nav>
<a href="#一Spark-SQL简介">一、Spark SQL简介</a><br/>
<a href="#二DataFrame--DataSet">二、DataFrame & DataSet </a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#21-DataFrame">2.1 DataFrame </a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#22-DataFrame-对比-RDDs">2.2 DataFrame 对比 RDDs</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#23-DataSet">2.3 DataSet</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#24-静态类型与运行时类型安全">2.4 静态类型与运行时类型安全</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#25-Untyped--Typed">2.5 Untyped & Typed </a><br/>
<a href="#三DataFrame--DataSet---RDDs-总结">三、DataFrame & DataSet & RDDs 总结</a><br/>
<a href="#四Spark-SQL的运行原理">四、Spark SQL的运行原理</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#41-逻辑计划Logical-Plan">4.1 逻辑计划(Logical Plan)</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#42-物理计划Physical-Plan">4.2 物理计划(Physical Plan) </a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#43-执行">4.3 执行</a><br/>
</nav>
## 一、Spark SQL简介
Spark SQL 是 Spark 中的一个子模块,主要用于操作结构化数据。它具有以下特点:
+ 能够将 SQL 查询与 Spark 程序无缝混合,允许您使用 SQL 或 DataFrame API 对结构化数据进行查询;
+ 支持多种开发语言;
+ 支持多达上百种的外部数据源,包括 HiveAvroParquetORCJSON 和 JDBC 等;
+ 支持 HiveQL 语法以及 Hive SerDes 和 UDF允许你访问现有的 Hive 仓库;
+ 支持标准的 JDBC 和 ODBC 连接;
+ 支持优化器,列式存储和代码生成等特性;
+ 支持扩展并能保证容错。
<div align="center"> <img src="../pictures/sql-hive-arch.png"/> </div>
## 二、DataFrame & DataSet
### 2.1 DataFrame
为了支持结构化数据的处理Spark SQL 提供了新的数据结构 DataFrame。DataFrame 是一个由具名列组成的数据集。它在概念上等同于关系数据库中的表或 R/Python 语言中的 `data frame`。 由于 Spark SQL 支持多种语言的开发,所以每种语言都定义了 `DataFrame` 的抽象,主要如下:
| 语言 | 主要抽象 |
| ------ | -------------------------------------------- |
| Scala | Dataset[T] & DataFrame (Dataset[Row] 的别名) |
| Java | Dataset[T] |
| Python | DataFrame |
| R | DataFrame |
### 2.2 DataFrame 对比 RDDs
DataFrame 和 RDDs 最主要的区别在于一个面向的是结构化数据,一个面向的是非结构化数据,它们内部的数据结构如下:
<div align="center"> <img src="../pictures/spark-dataFrame+RDDs.png"/> </div>
DataFrame 内部的有明确 Scheme 结构,即列名、列字段类型都是已知的,这带来的好处是可以减少数据读取以及更好地优化执行计划,从而保证查询效率。
**DataFrame 和 RDDs 应该如何选择?**
+ 如果你想使用函数式编程而不是 DataFrame API则使用 RDDs
+ 如果你的数据是非结构化的 (比如流媒体或者字符流),则使用 RDDs
+ 如果你的数据是结构化的 (如 RDBMS 中的数据) 或者半结构化的 (如日志),出于性能上的考虑,应优先使用 DataFrame。
### 2.3 DataSet
Dataset 也是分布式的数据集合,在 Spark 1.6 版本被引入,它集成了 RDD 和 DataFrame 的优点,具备强类型的特点,同时支持 Lambda 函数,但只能在 Scala 和 Java 语言中使用。在 Spark 2.0 后为了方便开发者Spark 将 DataFrame 和 Dataset 的 API 融合到一起,提供了结构化的 API(Structured API),即用户可以通过一套标准的 API 就能完成对两者的操作。
> 这里注意一下DataFrame 被标记为 Untyped API而 DataSet 被标记为 Typed API后文会对两者做出解释。
<div align="center"> <img width="600px" src="../pictures/spark-unifed.png"/> </div>
### 2.4 静态类型与运行时类型安全
静态类型 (Static-typing) 与运行时类型安全 (runtime type-safety) 主要表现如下:
在实际使用中,如果你用的是 Spark SQL 的查询语句,则直到运行时你才会发现有语法错误,而如果你用的是 DataFrame 和 Dataset则在编译时就可以发现错误 (这节省了开发时间和整体代价)。DataFrame 和 Dataset 主要区别在于:
在 DataFrame 中,当你调用了 API 之外的函数,编译器就会报错,但如果你使用了一个不存在的字段名字,编译器依然无法发现。而 Dataset 的 API 都是用 Lambda 函数和 JVM 类型对象表示的,所有不匹配的类型参数在编译时就会被发现。
以上这些最终都被解释成关于类型安全图谱对应开发中的语法和分析错误。在图谱中Dataset 最严格,但对于开发者来说效率最高。
<div align="center"> <img width="600px" src="../pictures/spark-运行安全.png"/> </div>
上面的描述可能并没有那么直观,下面的给出一个 IDEA 中代码编译的示例:
<div align="center"> <img src="../pictures/spark-运行时类型安全.png"/> </div>
这里一个可能的疑惑是 DataFrame 明明是有确定的 Scheme 结构 (即列名、列字段类型都是已知的),但是为什么还是无法对列名进行推断和错误判断,这是因为 DataFrame 是 Untyped 的。
### 2.5 Untyped & Typed
在上面我们介绍过 DataFrame API 被标记为 `Untyped API`,而 DataSet API 被标记为 `Typed API`。DataFrame 的 `Untyped` 是相对于语言或 API 层面而言,它确实有明确的 Scheme 结构,即列名,列类型都是确定的,但这些信息完全由 Spark 来维护Spark 只会在运行时检查这些类型和指定类型是否一致。这也就是为什么在 Spark 2.0 之后,官方推荐把 DataFrame 看做是 `DatSet[Row]`Row 是 Spark 中定义的一个 `trait`,其子类中封装了列字段的信息。
相对而言DataSet 是 `Typed`即强类型。如下面代码DataSet 的类型由 Case Class(Scala) 或者 Java Bean(Java) 来明确指定的,在这里即每一行数据代表一个 `Person`,这些信息由 JVM 来保证正确性,所以字段名错误和类型错误在编译的时候就会被 IDE 所发现。
```scala
case class Person(name: String, age: Long)
val dataSet: Dataset[Person] = spark.read.json("people.json").as[Person]
```
## 三、DataFrame & DataSet & RDDs 总结
这里对三者做一下简单的总结:
+ RDDs 适合非结构化数据的处理,而 DataFrame & DataSet 更适合结构化数据和半结构化的处理;
+ DataFrame & DataSet 可以通过统一的 Structured API 进行访问,而 RDDs 则更适合函数式编程的场景;
+ 相比于 DataFrame 而言DataSet 是强类型的 (Typed),有着更为严格的静态类型检查;
+ DataSets、DataFrames、SQL 的底层都依赖了 RDDs API并对外提供结构化的访问接口。
<div align="center"> <img width="600px" src="../pictures/spark-structure-api.png"/> </div>
## 四、Spark SQL的运行原理
DataFrame、DataSet 和 Spark SQL 的实际执行流程都是相同的:
1. 进行 DataFrame/Dataset/SQL 编程;
2. 如果是有效的代码即代码没有编译错误Spark 会将其转换为一个逻辑计划;
3. Spark 将此逻辑计划转换为物理计划,同时进行代码优化;
4. Spark 然后在集群上执行这个物理计划 (基于 RDD 操作) 。
### 4.1 逻辑计划(Logical Plan)
执行的第一个阶段是将用户代码转换成一个逻辑计划。它首先将用户代码转换成 `unresolved logical plan`(未解决的逻辑计划),之所以这个计划是未解决的,是因为尽管您的代码在语法上是正确的,但是它引用的表或列可能不存在。 Spark 使用 `analyzer`(分析器) 基于 `catalog`(存储的所有表和 `DataFrames` 的信息) 进行解析。解析失败则拒绝执行,解析成功则将结果传给 `Catalyst` 优化器 (`Catalyst Optimizer`),优化器是一组规则的集合,用于优化逻辑计划,通过谓词下推等方式进行优化,最终输出优化后的逻辑执行计划。
<div align="center"> <img src="../pictures/spark-Logical-Planning.png"/> </div>
### 4.2 物理计划(Physical Plan)
得到优化后的逻辑计划后Spark 就开始了物理计划过程。 它通过生成不同的物理执行策略,并通过成本模型来比较它们,从而选择一个最优的物理计划在集群上面执行的。物理规划的输出结果是一系列的 RDDs 和转换关系 (transformations)。
<div align="center"> <img src="../pictures/spark-Physical-Planning.png"/> </div>
### 4.3 执行
在选择一个物理计划后Spark 运行其 RDDs 代码,并在运行时执行进一步的优化,生成本地 Java 字节码,最后将运行结果返回给用户。
## 参考资料
1. Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02
2. [Spark SQL, DataFrames and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)
3. [且谈 Apache Spark 的 API 三剑客RDD、DataFrame 和 Dataset(译文)](https://www.infoq.cn/article/three-apache-spark-apis-rdds-dataframes-and-datasets)
4. [A Tale of Three Apache Spark APIs: RDDs vs DataFrames and Datasets(原文)](https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html)