From d57c7927a8d1ba8e80d0879cb5c3ea68d5b07e53 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=BD=97=E7=A5=A5?= <1366971433@qq.com>
Date: Fri, 24 May 2019 15:42:36 +0800
Subject: [PATCH] =?UTF-8?q?spark=20sql=E5=A4=96=E9=83=A8=E6=95=B0=E6=8D=AE?=
=?UTF-8?q?=E6=BA=90?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
notes/SparkSQL外部数据源.md | 352 ++++++++++++------
pictures/spark-mysql-分区上下限.png | Bin 0 -> 47778 bytes
pictures/spark-分区.png | Bin 0 -> 17646 bytes
resources/csv/dept.csv | 4 +
resources/orc/dept.orc | Bin 0 -> 552 bytes
...fa9ad-c000.snappy.parquet => dept.parquet} | Bin
resources/parquet/dept.parquet/._SUCCESS.crc | Bin 8 -> 0 bytes
...-b2f4-b780aecfa9ad-c000.snappy.parquet.crc | Bin 16 -> 0 bytes
resources/parquet/dept.parquet/_SUCCESS | 0
...e2ad66-c000.snappy.parquet => emp.parquet} | Bin
resources/parquet/emp.parquet/._SUCCESS.crc | Bin 8 -> 0 bytes
...-b8bc-0a768ae2ad66-c000.snappy.parquet.crc | Bin 32 -> 0 bytes
resources/parquet/emp.parquet/_SUCCESS | 0
13 files changed, 241 insertions(+), 115 deletions(-)
create mode 100644 pictures/spark-mysql-分区上下限.png
create mode 100644 pictures/spark-分区.png
create mode 100644 resources/csv/dept.csv
create mode 100644 resources/orc/dept.orc
rename resources/parquet/{dept.parquet/part-00000-9dcc37bc-f4e8-4c2f-b2f4-b780aecfa9ad-c000.snappy.parquet => dept.parquet} (100%)
delete mode 100644 resources/parquet/dept.parquet/._SUCCESS.crc
delete mode 100644 resources/parquet/dept.parquet/.part-00000-9dcc37bc-f4e8-4c2f-b2f4-b780aecfa9ad-c000.snappy.parquet.crc
delete mode 100644 resources/parquet/dept.parquet/_SUCCESS
rename resources/parquet/{emp.parquet/part-00000-d2881f1b-46f7-44d4-b8bc-0a768ae2ad66-c000.snappy.parquet => emp.parquet} (100%)
delete mode 100644 resources/parquet/emp.parquet/._SUCCESS.crc
delete mode 100644 resources/parquet/emp.parquet/.part-00000-d2881f1b-46f7-44d4-b8bc-0a768ae2ad66-c000.snappy.parquet.crc
delete mode 100644 resources/parquet/emp.parquet/_SUCCESS
diff --git a/notes/SparkSQL外部数据源.md b/notes/SparkSQL外部数据源.md
index f7939ff..fe1d333 100644
--- a/notes/SparkSQL外部数据源.md
+++ b/notes/SparkSQL外部数据源.md
@@ -1,5 +1,43 @@
# Spark SQL 外部数据源
+
+
## 一、简介
### 1.1 多数据源支持
@@ -11,11 +49,13 @@ Spark支持以下六个核心数据源,同时Spark社区还提供了多达上
- Parquet
- ORC
- JDBC/ODBC connections
-- Plain-text files
+- Plain-text files
+
+> 注:以下所有测试文件均可从本仓库的[resources](https://github.com/heibaiying/BigData-Notes/tree/master/resources)目录进行下载
### 1.2 读数据格式
-所有数据源读取API都遵循以下调用格式:
+所有读取API遵循以下调用格式:
```scala
// 格式
@@ -47,20 +87,22 @@ DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(
//示例
dataframe.write.format("csv")
-.option("mode", "OVERWRITE") //写模式
-.option("dateFormat", "yyyy-MM-dd")
+.option("mode", "OVERWRITE") //写模式
+.option("dateFormat", "yyyy-MM-dd") //日期格式
.option("path", "path/to/file(s)")
.save()
```
-| Scala/Java | 描述 |
-| :--------------------------------- | :----------------------------------------------------------- |
-| `SaveMode.ErrorIfExists`(默认模式) | 如果给定的路径已经存在文件,则抛出异常,这是写数据默认的模式 |
-| `SaveMode.Append` | 数据以追加的方式写入 |
-| `SaveMode.Overwrite` | 数据以覆盖的方式写入 |
-| `SaveMode.Ignore` | Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected not to save the contents of the DataFrame and not to change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL. |
+写数据模式有以下四种可选项:
+| Scala/Java | 描述 |
+| :----------------------- | :----------------------------------------------------------- |
+| `SaveMode.ErrorIfExists` | 如果给定的路径已经存在文件,则抛出异常,这是写数据默认的模式 |
+| `SaveMode.Append` | 数据以追加的方式写入 |
+| `SaveMode.Overwrite` | 数据以覆盖的方式写入 |
+| `SaveMode.Ignore` | 如果给定的路径已经存在文件,则不做任何操作 |
+
## 二、CSV
@@ -68,74 +110,86 @@ CSV是一种常见的文本文件格式,其中每一行表示一条记录,
### 2.1 读取CSV文件
-自动推断类型:
+自动推断类型读取读取示例:
```scala
spark.read.format("csv")
-.option("header", "true")
-.option("mode", "FAILFAST")
-.option("inferSchema", "true")
-.load("some/path/to/file.csv")
+.option("header", "false") // 文件中的第一行是否为列的名称
+.option("mode", "FAILFAST") // 是否快速失败
+.option("inferSchema", "true") // 是否自动推断schema
+.load("/usr/file/csv/dept.csv")
+.show()
```
使用预定义类型:
```scala
-import org.apache.spark.sql.types.{StructField, StructType, StringType,
-LongType}
+import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
+//预定义数据格式
val myManualSchema = new StructType(Array(
-new StructField("DEST_COUNTRY_NAME", StringType, true),
-new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
-new StructField("count", LongType, false)
+ StructField("deptno", LongType, nullable = false),
+ StructField("dname", StringType,nullable = true),
+ StructField("loc", StringType,nullable = true)
))
spark.read.format("csv")
-.option("header", "true")
.option("mode", "FAILFAST")
.schema(myManualSchema)
-.load("/data/flight-data/csv/2010-summary.csv")
-.show(5)
+.load("/usr/file/csv/dept.csv")
+.show()
```
### 2.2 写入CSV文件
```scala
-import org.apache.spark.sql.types.{StructField, StructType, StringType,
-LongType}
-val myManualSchema = new StructType(Array(
-new StructField("DEST_COUNTRY_NAME", StringType, true),
-new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
-new StructField("count", LongType, false)
-))
-spark.read.format("csv")
-.option("header", "true")
-.option("mode", "FAILFAST")
-.schema(myManualSchema)
-.load("/data/flight-data/csv/2010-summary.csv")
-.show(5)
+df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
```
-将csv文件,转换为tsv文件:
+也可以指定具体的分隔符:
```scala
-csvFile.write.format("csv").mode("overwrite").option("sep", "\t")
-.save("/tmp/my-tsv-file.tsv")
+df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")
```
+### 2.3 可选配置
+
+为节省主文篇幅,所有读写配置项见文末9.1小节。
+
+
+
## 三、JSON
### 3.1 读取JSON文件
```json
-spark.read.format("json").option("mode", "FAILFAST").schema(myManualSchema)
-.load("/data/flight-data/json/2010-summary.json").show(5)
+spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)
+```
+
+需要注意的是:默认不支持一条数据记录跨越多行(如下),可以通过配置`multiLine`为`true`来进行更改,其默认值为`false`。
+
+```json
+// 默认支持单行
+{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}
+
+//默认不支持多行
+{
+ "DEPTNO": 10,
+ "DNAME": "ACCOUNTING",
+ "LOC": "NEW YORK"
+}
```
### 3.2 写入JSON文件
```scala
-csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json")
+df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
```
+### 3.3 可选配置
+
+为节省主文篇幅,所有读写配置项见文末9.2小节。
+
+
+
## 四、Parquet
Parquet是一个开源的面向列的数据存储,它提供了多种存储优化,允许读取单独的列非整个文件,这不仅节省了存储空间而且提升了读取效率,它是Spark是默认的文件格式。
@@ -143,17 +197,13 @@ csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json")
### 4.1 读取Parquet文件
```scala
-spark.read.format("parquet")
-.load("/data/flight-data/parquet/2010-summary.parquet").show(5)
+spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
```
-需要注意的是:默认不支持一条数据记录跨越多行,可以通过配置`multiLine`为`true`来进行更改,其默认值为`false`。
-
### 2.2 写入Parquet文件
```scala
-csvFile.write.format("parquet").mode("overwrite")
-.save("/tmp/my-parquet-file.parquet")
+df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
```
### 2.3 可选配置
@@ -165,117 +215,183 @@ Parquet文件有着自己的存储规则,因此其可选配置项比较少,
| 读 | compression or codec | None,
uncompressed,
bzip2,
deflate, gzip,
lz4, or snappy | None | 压缩文件格式 |
| Read | mergeSchema | true, false | 取决于配置项`spark.sql.parquet.mergeSchema` | 当为真时,Parquet数据源将从所有数据文件收集的Schema合并在一起,否则将从摘要文件中选择Schema,如果没有可用的摘要文件,则从随机数据文件中选择Schema。 |
-> 完整的配置列表可以参阅官方文档:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
+> 更多可选配置可以参阅官方文档:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
+
+
## 五、ORC
-ORC是一种自描述的、类型感知的列文件格式,它针对大型数据的读取进行了优化,也是大数据中常用的文件格式。
+ORC是一种自描述的、类型感知的列文件格式,它针对大型数据的读写进行了优化,也是大数据中常用的文件格式。
### 5.1 读取ORC文件
```scala
-spark.read.format("orc").load("/data/flight-data/orc/2010-summary.orc").show(5)
+spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
```
### 4.2 写入ORC文件
```scala
-csvFile.write.format("orc").mode("overwrite").save("/tmp/my-json-file.orc")
+csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
```
+
+
## 六、SQL Databases
-spark同样支持与传统的关系型数据库进行数据读写。
+spark同样支持与传统的关系型数据库进行数据读写。但是Spark程序默认是没有提供数据库驱动的,所以在使用前需要将对应的数据库驱动上传到安装目录下的`jars`目录中。下面示例使用的是Mysql数据库,使用前需要将对应的`mysql-connector-java-x.x.x.jar`上传到jars目录下。
-### 6.1 获取数据库连接
+> 为方便大家测试,我在本仓库的[resources](https://github.com/heibaiying/BigData-Notes/tree/master/resources)目录下上传了mysql-connector-java-5.1.47.jar
+
+### 6.1 读取数据
+
+读取全表数据示例如下,这里的`help_keyword`是mysql内置的字典表,只有`help_keyword_id`和`name`两个字段。
```scala
-import java.sql.DriverManager
-val connection = DriverManager.getConnection(url)
-connection.isClosed()
-connection.close()
-```
-
-### 6.2 读取数据
-
-读取全表数据:
-
-```scala
-val pgDF = spark.read
+spark.read
.format("jdbc")
-.option("driver", "org.postgresql.Driver")
-.option("url", "jdbc:postgresql://database_server")
-.option("dbtable", "schema.tablename")
-.option("user", "username").option("password","my-secret-password").load()
+.option("driver", "com.mysql.jdbc.Driver") //驱动
+.option("url", "jdbc:mysql://127.0.0.1:3306/mysql") //数据库地址
+.option("dbtable", "help_keyword") //表名
+.option("user", "root").option("password","root").load().show(10)
```
-读取过滤后的数据:
+从查询结果读取数据:
```scala
-val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info)
-AS flight_info"""
-val dbDataFrame = spark.read.format("jdbc")
-.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
-.load()
+val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
+spark.read.format("jdbc")
+.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
+.option("driver", "com.mysql.jdbc.Driver")
+.option("user", "root").option("password", "root")
+.option("dbtable", pushDownQuery)
+.load().show()
+
+//输出
++---------------+-----------+
+|help_keyword_id| name|
++---------------+-----------+
+| 0| <>|
+| 1| ACTION|
+| 2| ADD|
+| 3|AES_DECRYPT|
+| 4|AES_ENCRYPT|
+| 5| AFTER|
+| 6| AGAINST|
+| 7| AGGREGATE|
+| 8| ALGORITHM|
+| 9| ALL|
+| 10| ALTER|
+| 11| ANALYSE|
+| 12| ANALYZE|
+| 13| AND|
+| 14| ARCHIVE|
+| 15| AREA|
+| 16| AS|
+| 17| ASBINARY|
+| 18| ASC|
+| 19| ASTEXT|
++---------------+-----------+
```
+也可以使用如下的写法进行数据的过滤:
+
```scala
val props = new java.util.Properties
-props.setProperty("driver", "org.sqlite.JDBC")
-val predicates = Array(
-"DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'",
-"DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'")
-spark.read.jdbc(url, tablename, predicates, props).count() // 510
+props.setProperty("driver", "com.mysql.jdbc.Driver")
+props.setProperty("user", "root")
+props.setProperty("password", "root")
+val predicates = Array("help_keyword_id < 10 OR name = 'WHEN'") //指定数据过滤条件
+spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql", "help_keyword", predicates, props).show()
+
+//输出:
++---------------+-----------+
+|help_keyword_id| name|
++---------------+-----------+
+| 0| <>|
+| 1| ACTION|
+| 2| ADD|
+| 3|AES_DECRYPT|
+| 4|AES_ENCRYPT|
+| 5| AFTER|
+| 6| AGAINST|
+| 7| AGGREGATE|
+| 8| ALGORITHM|
+| 9| ALL|
+| 604| WHEN|
++---------------+-----------+
```
-并行读取数据:
+可以使用`numPartitions`指定读取数据的并行度:
```scala
-val dbDataFrame = spark.read.format("jdbc")
-.option("url", url).option("dbtable", tablename).option("driver", driver)
-.option("numPartitions", 10).load()
+option("numPartitions", 10)
```
-在这里,我们对第一个分区和最后一个分区分别指定了最小值和最大值。任何超出这些界限的都在第一个分区或最后一个分区中。然后,我们设置希望的分区总数(这是并行度的级别)。
+在这里,除了可以指定分区外,还可以设置上界和下界,任何小于下界的值都会被分配在第一个分区中,任何大于上界的值都会被分配在最后一个分区中。
```scala
-val colName = "count"
-val lowerBound = 0L
-val upperBound = 348113L // this is the max count in our database
-val numPartitions = 10
-spark.read.jdbc(url,tablename,colName,lowerBound,upperBound,numPartitions,props)
-.count() // 255
+val colName = "help_keyword_id" //用于判断上下界的列
+val lowerBound = 300L //下界
+val upperBound = 500L //上界
+val numPartitions = 10 //分区综述
+val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword",
+ colName,lowerBound,upperBound,numPartitions,props)
```
-
-
-### 6.3 写入数据
+想要验证分区内容,可以使用`mapPartitionsWithIndex`这个算子,代码如下:
```scala
-val newPath = "jdbc:sqlite://tmp/my-sqlite.db"
-csvFile.write.mode("overwrite").jdbc(newPath, tablename, props)
+jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => {
+ val buffer = new ListBuffer[String]
+ while (iterator.hasNext) {
+ buffer.append(index + "分区:" + iterator.next())
+ }
+ buffer.toIterator
+}).foreach(println)
```
+执行结果如下:`help_keyword`这张表只有600条左右的数据,本来数据应该均匀分布在10个分区,但是0分区里面却有319条数据,这是因为设置了下限,所有小于300的数据都会被限制在第一个分区,即0分区。同理所有大于500的数据被分配在9分区,即最后一个分区。
+
+
Yat;Xz@kHow(>A@uEC}dnqt=~)fuF$PF^lG1cQFyWnueIz4a0bRhg5RS<=A4fVa8W
zWr7UDCfXpS{TH`uB$uMr;mJJ(`8n?WgvX8Zb;B&X#cU~*-{wfk>p7gRhMw|F-f!wx
zdea?b8$5*T4X)2@8Rfs4i_Mp2a`t|C+a)jOiPGzqVwu-^T&@Lc-y|k#|K)LLMLxgD
zc721l&ymw=r2q(a4I)1m*tB7(4E+cuTF}e2iK8^OI6b=x(p=yJo;n}b4)32VrK#)S
z^M@H?3=L>H0kO?VPRuSvoL+~K8Y2>9>;b14_Koh0J(NL*5#;V>rPW8m%zXR3xmmZ#
zmgW`%Lz?La@$UR2)g$*ljJTS|d2#*{@2hM3bPIWKb3)_F@y1618>TuunJSqqq;EJ4
z%AUy}%p}!A&ubGxT*qC?TmUU3J2H0-We=fzD<|$BKhNWL#Me?1h?7d$gGF~B3JL}l
zPL`X9*j@IRPZ-=x_ihh;k%85TND@m{>tPT|k{YVLJ9~cOoFg1KN6~3k7XRv1`wHQx
zLcf~f
Er
U-ZTu3DGYaO+w)=@RL6GGm|2qA@NYxWUzgEG1pNQls
z7($PiOpvze(()d6
#&G!V9)8>82P(J!kw{^B*+FOwCW?AH9lli*>y~NC*SDN
zIIBzg-V?q1)$qqx4r^KEO*-J6W17%6-|kr9Er)VLsuR$6*|>;B_nK^bdgQ1NTUb)Q
zzhg