# Spark SQL 外部数据源
## 一、简介
### 1.1 多数据源支持
Spark 支持以下六个核心数据源,同时 Spark 社区还提供了多达上百种数据源的读取方式,能够满足绝大部分使用场景。
- CSV
- JSON
- Parquet
- ORC
- JDBC/ODBC connections
- Plain-text files
> 注:以下所有测试文件均可从本仓库的[resources](https://github.com/heibaiying/BigData-Notes/tree/master/resources) 目录进行下载
### 1.2 读数据格式
所有读取 API 遵循以下调用格式:
```scala
// 格式
DataFrameReader.format(...).option("key", "value").schema(...).load()
// 示例
spark.read.format("csv")
.option("mode", "FAILFAST") // 读取模式
.option("inferSchema", "true") // 是否自动推断 schema
.option("path", "path/to/file(s)") // 文件路径
.schema(someSchema) // 使用预定义的 schema
.load()
```
读取模式有以下三种可选项:
| 读模式 | 描述 |
| --------------- | ------------------------------------------------------------ |
| `permissive` | 当遇到损坏的记录时,将其所有字段设置为 null,并将所有损坏的记录放在名为 _corruption t_record 的字符串列中 |
| `dropMalformed` | 删除格式不正确的行 |
| `failFast` | 遇到格式不正确的数据时立即失败 |
### 1.3 写数据格式
```scala
// 格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
//示例
dataframe.write.format("csv")
.option("mode", "OVERWRITE") //写模式
.option("dateFormat", "yyyy-MM-dd") //日期格式
.option("path", "path/to/file(s)")
.save()
```
写数据模式有以下四种可选项:
| Scala/Java | 描述 |
| :----------------------- | :----------------------------------------------------------- |
| `SaveMode.ErrorIfExists` | 如果给定的路径已经存在文件,则抛出异常,这是写数据默认的模式 |
| `SaveMode.Append` | 数据以追加的方式写入 |
| `SaveMode.Overwrite` | 数据以覆盖的方式写入 |
| `SaveMode.Ignore` | 如果给定的路径已经存在文件,则不做任何操作 |
## 二、CSV
CSV 是一种常见的文本文件格式,其中每一行表示一条记录,记录中的每个字段用逗号分隔。
### 2.1 读取CSV文件
自动推断类型读取读取示例:
```scala
spark.read.format("csv")
.option("header", "false") // 文件中的第一行是否为列的名称
.option("mode", "FAILFAST") // 是否快速失败
.option("inferSchema", "true") // 是否自动推断 schema
.load("/usr/file/csv/dept.csv")
.show()
```
使用预定义类型:
```scala
import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
//预定义数据格式
val myManualSchema = new StructType(Array(
StructField("deptno", LongType, nullable = false),
StructField("dname", StringType,nullable = true),
StructField("loc", StringType,nullable = true)
))
spark.read.format("csv")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()
```
### 2.2 写入CSV文件
```scala
df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
```
也可以指定具体的分隔符:
```scala
df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")
```
### 2.3 可选配置
为节省主文篇幅,所有读写配置项见文末 9.1 小节。
## 三、JSON
### 3.1 读取JSON文件
```json
spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)
```
需要注意的是:默认不支持一条数据记录跨越多行 (如下),可以通过配置 `multiLine` 为 `true` 来进行更改,其默认值为 `false`。
```json
// 默认支持单行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}
//默认不支持多行
{
"DEPTNO": 10,
"DNAME": "ACCOUNTING",
"LOC": "NEW YORK"
}
```
### 3.2 写入JSON文件
```scala
df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
```
### 3.3 可选配置
为节省主文篇幅,所有读写配置项见文末 9.2 小节。
## 四、Parquet
Parquet 是一个开源的面向列的数据存储,它提供了多种存储优化,允许读取单独的列非整个文件,这不仅节省了存储空间而且提升了读取效率,它是 Spark 是默认的文件格式。
### 4.1 读取Parquet文件
```scala
spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
```
### 2.2 写入Parquet文件
```scala
df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
```
### 2.3 可选配置
Parquet 文件有着自己的存储规则,因此其可选配置项比较少,常用的有如下两个:
| 读写操作 | 配置项 | 可选值 | 默认值 | 描述 |
| -------- | -------------------- | ------------------------------------------------------------ | ------------------------------------------- | ------------------------------------------------------------ |
| Write | compression or codec | None,
uncompressed,
bzip2,
deflate, gzip,
lz4, or snappy | None | 压缩文件格式 |
| Read | mergeSchema | true, false | 取决于配置项 `spark.sql.parquet.mergeSchema` | 当为真时,Parquet 数据源将所有数据文件收集的 Schema 合并在一起,否则将从摘要文件中选择 Schema,如果没有可用的摘要文件,则从随机数据文件中选择 Schema。 |
> 更多可选配置可以参阅官方文档:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
## 五、ORC
ORC 是一种自描述的、类型感知的列文件格式,它针对大型数据的读写进行了优化,也是大数据中常用的文件格式。
### 5.1 读取ORC文件
```scala
spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
```
### 4.2 写入ORC文件
```scala
csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
```
## 六、SQL Databases
Spark 同样支持与传统的关系型数据库进行数据读写。但是 Spark 程序默认是没有提供数据库驱动的,所以在使用前需要将对应的数据库驱动上传到安装目录下的 `jars` 目录中。下面示例使用的是 Mysql 数据库,使用前需要将对应的 `mysql-connector-java-x.x.x.jar` 上传到 `jars` 目录下。
### 6.1 读取数据
读取全表数据示例如下,这里的 `help_keyword` 是 mysql 内置的字典表,只有 `help_keyword_id` 和 `name` 两个字段。
```scala
spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver") //驱动
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql") //数据库地址
.option("dbtable", "help_keyword") //表名
.option("user", "root").option("password","root").load().show(10)
```
从查询结果读取数据:
```scala
val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root").option("password", "root")
.option("dbtable", pushDownQuery)
.load().show()
//输出
+---------------+-----------+
|help_keyword_id| name|
+---------------+-----------+
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 10| ALTER|
| 11| ANALYSE|
| 12| ANALYZE|
| 13| AND|
| 14| ARCHIVE|
| 15| AREA|
| 16| AS|
| 17| ASBINARY|
| 18| ASC|
| 19| ASTEXT|
+---------------+-----------+
```
也可以使用如下的写法进行数据的过滤:
```scala
val props = new java.util.Properties
props.setProperty("driver", "com.mysql.jdbc.Driver")
props.setProperty("user", "root")
props.setProperty("password", "root")
val predicates = Array("help_keyword_id < 10 OR name = 'WHEN'") //指定数据过滤条件
spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql", "help_keyword", predicates, props).show()
//输出:
+---------------+-----------+
|help_keyword_id| name|
+---------------+-----------+
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 604| WHEN|
+---------------+-----------+
```
可以使用 `numPartitions` 指定读取数据的并行度:
```scala
option("numPartitions", 10)
```
在这里,除了可以指定分区外,还可以设置上界和下界,任何小于下界的值都会被分配在第一个分区中,任何大于上界的值都会被分配在最后一个分区中。
```scala
val colName = "help_keyword_id" //用于判断上下界的列
val lowerBound = 300L //下界
val upperBound = 500L //上界
val numPartitions = 10 //分区综述
val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword",
colName,lowerBound,upperBound,numPartitions,props)
```
想要验证分区内容,可以使用 `mapPartitionsWithIndex` 这个算子,代码如下:
```scala
jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => {
val buffer = new ListBuffer[String]
while (iterator.hasNext) {
buffer.append(index + "分区:" + iterator.next())
}
buffer.toIterator
}).foreach(println)
```
执行结果如下:`help_keyword` 这张表只有 600 条左右的数据,本来数据应该均匀分布在 10 个分区,但是 0 分区里面却有 319 条数据,这是因为设置了下限,所有小于 300 的数据都会被限制在第一个分区,即 0 分区。同理所有大于 500 的数据被分配在 9 分区,即最后一个分区。