spark sql外部数据源

This commit is contained in:
罗祥 2019-05-24 15:42:36 +08:00
parent 9bbb86fdd2
commit d57c7927a8
13 changed files with 241 additions and 115 deletions

View File

@ -1,5 +1,43 @@
# Spark SQL 外部数据源 # Spark SQL 外部数据源
<nav>
<a href="#一简介">一、简介</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#11-多数据源支持">1.1 多数据源支持</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#12-读数据格式">1.2 读数据格式</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#13-写数据格式">1.3 写数据格式</a><br/>
<a href="#二CSV">二、CSV</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#21-读取CSV文件">2.1 读取CSV文件</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#22-写入CSV文件">2.2 写入CSV文件</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#23-可选配置">2.3 可选配置</a><br/>
<a href="#三JSON">三、JSON</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#31-读取JSON文件">3.1 读取JSON文件</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#32-写入JSON文件">3.2 写入JSON文件</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#33-可选配置">3.3 可选配置</a><br/>
<a href="#四Parquet">四、Parquet</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#41-读取Parquet文件">4.1 读取Parquet文件</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#22-写入Parquet文件">2.2 写入Parquet文件</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#23-可选配置">2.3 可选配置</a><br/>
<a href="#五ORC">五、ORC </a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#51-读取ORC文件">5.1 读取ORC文件 </a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#42-写入ORC文件">4.2 写入ORC文件</a><br/>
<a href="#六SQL-Databases">六、SQL Databases </a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#61-读取数据">6.1 读取数据</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#62-写入数据">6.2 写入数据</a><br/>
<a href="#七Text">七、Text </a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#71-读取Text数据">7.1 读取Text数据</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#72-写入Text数据">7.2 写入Text数据</a><br/>
<a href="#八数据读写高级特性">八、数据读写高级特性</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#81-并行读">8.1 并行读</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#82-并行写">8.2 并行写</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#83-分区写入">8.3 分区写入</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#83-分桶写入">8.3 分桶写入</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#85-文件大小管理">8.5 文件大小管理</a><br/>
<a href="#九可选配置附录">九、可选配置附录</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#91-CSV读写可选配置">9.1 CSV读写可选配置</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#92-JSON读写可选配置">9.2 JSON读写可选配置</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#93-数据库读写可选配置">9.3 数据库读写可选配置</a><br/>
</nav>
## 一、简介 ## 一、简介
### 1.1 多数据源支持 ### 1.1 多数据源支持
@ -13,9 +51,11 @@ Spark支持以下六个核心数据源同时Spark社区还提供了多达上
- JDBC/ODBC connections - JDBC/ODBC connections
- Plain-text files - Plain-text files
> 注:以下所有测试文件均可从本仓库的[resources](https://github.com/heibaiying/BigData-Notes/tree/master/resources)目录进行下载
### 1.2 读数据格式 ### 1.2 读数据格式
所有数据源读取API遵循以下调用格式: 所有读取API遵循以下调用格式
```scala ```scala
// 格式 // 格式
@ -48,19 +88,21 @@ DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(
//示例 //示例
dataframe.write.format("csv") dataframe.write.format("csv")
.option("mode", "OVERWRITE") //写模式 .option("mode", "OVERWRITE") //写模式
.option("dateFormat", "yyyy-MM-dd") .option("dateFormat", "yyyy-MM-dd") //日期格式
.option("path", "path/to/file(s)") .option("path", "path/to/file(s)")
.save() .save()
``` ```
写数据模式有以下四种可选项:
| Scala/Java | 描述 | | Scala/Java | 描述 |
| :--------------------------------- | :----------------------------------------------------------- | | :----------------------- | :----------------------------------------------------------- |
| `SaveMode.ErrorIfExists`(默认模式) | 如果给定的路径已经存在文件,则抛出异常,这是写数据默认的模式 | | `SaveMode.ErrorIfExists` | 如果给定的路径已经存在文件,则抛出异常,这是写数据默认的模式 |
| `SaveMode.Append` | 数据以追加的方式写入 | | `SaveMode.Append` | 数据以追加的方式写入 |
| `SaveMode.Overwrite` | 数据以覆盖的方式写入 | | `SaveMode.Overwrite` | 数据以覆盖的方式写入 |
| `SaveMode.Ignore` | Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected not to save the contents of the DataFrame and not to change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL. | | `SaveMode.Ignore` | 如果给定的路径已经存在文件,则不做任何操作 |
<br/>
## 二、CSV ## 二、CSV
@ -68,74 +110,86 @@ CSV是一种常见的文本文件格式其中每一行表示一条记录
### 2.1 读取CSV文件 ### 2.1 读取CSV文件
自动推断类型: 自动推断类型读取读取示例
```scala ```scala
spark.read.format("csv") spark.read.format("csv")
.option("header", "true") .option("header", "false") // 文件中的第一行是否为列的名称
.option("mode", "FAILFAST") .option("mode", "FAILFAST") // 是否快速失败
.option("inferSchema", "true") .option("inferSchema", "true") // 是否自动推断schema
.load("some/path/to/file.csv") .load("/usr/file/csv/dept.csv")
.show()
``` ```
使用预定义类型: 使用预定义类型:
```scala ```scala
import org.apache.spark.sql.types.{StructField, StructType, StringType, import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
LongType} //预定义数据格式
val myManualSchema = new StructType(Array( val myManualSchema = new StructType(Array(
new StructField("DEST_COUNTRY_NAME", StringType, true), StructField("deptno", LongType, nullable = false),
new StructField("ORIGIN_COUNTRY_NAME", StringType, true), StructField("dname", StringType,nullable = true),
new StructField("count", LongType, false) StructField("loc", StringType,nullable = true)
)) ))
spark.read.format("csv") spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST") .option("mode", "FAILFAST")
.schema(myManualSchema) .schema(myManualSchema)
.load("/data/flight-data/csv/2010-summary.csv") .load("/usr/file/csv/dept.csv")
.show(5) .show()
``` ```
### 2.2 写入CSV文件 ### 2.2 写入CSV文件
```scala ```scala
import org.apache.spark.sql.types.{StructField, StructType, StringType, df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
LongType}
val myManualSchema = new StructType(Array(
new StructField("DEST_COUNTRY_NAME", StringType, true),
new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
new StructField("count", LongType, false)
))
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/data/flight-data/csv/2010-summary.csv")
.show(5)
``` ```
将csv文件转换为tsv文件 也可以指定具体的分隔符:
```scala ```scala
csvFile.write.format("csv").mode("overwrite").option("sep", "\t") df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")
.save("/tmp/my-tsv-file.tsv")
``` ```
### 2.3 可选配置
为节省主文篇幅所有读写配置项见文末9.1小节。
<br/>
## 三、JSON ## 三、JSON
### 3.1 读取JSON文件 ### 3.1 读取JSON文件
```json ```json
spark.read.format("json").option("mode", "FAILFAST").schema(myManualSchema) spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)
.load("/data/flight-data/json/2010-summary.json").show(5) ```
需要注意的是:默认不支持一条数据记录跨越多行(如下),可以通过配置`multiLine``true`来进行更改,其默认值为`false`
```json
// 默认支持单行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}
//默认不支持多行
{
"DEPTNO": 10,
"DNAME": "ACCOUNTING",
"LOC": "NEW YORK"
}
``` ```
### 3.2 写入JSON文件 ### 3.2 写入JSON文件
```scala ```scala
csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json") df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
``` ```
### 3.3 可选配置
为节省主文篇幅所有读写配置项见文末9.2小节。
<br/>
## 四、Parquet ## 四、Parquet
Parquet是一个开源的面向列的数据存储它提供了多种存储优化允许读取单独的列非整个文件这不仅节省了存储空间而且提升了读取效率它是Spark是默认的文件格式。 Parquet是一个开源的面向列的数据存储它提供了多种存储优化允许读取单独的列非整个文件这不仅节省了存储空间而且提升了读取效率它是Spark是默认的文件格式。
@ -143,17 +197,13 @@ csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json")
### 4.1 读取Parquet文件 ### 4.1 读取Parquet文件
```scala ```scala
spark.read.format("parquet") spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
.load("/data/flight-data/parquet/2010-summary.parquet").show(5)
``` ```
需要注意的是:默认不支持一条数据记录跨越多行,可以通过配置`multiLine``true`来进行更改,其默认值为`false`
### 2.2 写入Parquet文件 ### 2.2 写入Parquet文件
```scala ```scala
csvFile.write.format("parquet").mode("overwrite") df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
.save("/tmp/my-parquet-file.parquet")
``` ```
### 2.3 可选配置 ### 2.3 可选配置
@ -165,117 +215,183 @@ Parquet文件有着自己的存储规则因此其可选配置项比较少
| 读 | compression or codec | None,<br/>uncompressed,<br/>bzip2,<br/>deflate, gzip,<br/>lz4, or snappy | None | 压缩文件格式 | | 读 | compression or codec | None,<br/>uncompressed,<br/>bzip2,<br/>deflate, gzip,<br/>lz4, or snappy | None | 压缩文件格式 |
| Read | mergeSchema | true, false | 取决于配置项`spark.sql.parquet.mergeSchema` | 当为真时Parquet数据源将从所有数据文件收集的Schema合并在一起否则将从摘要文件中选择Schema如果没有可用的摘要文件则从随机数据文件中选择Schema。 | | Read | mergeSchema | true, false | 取决于配置项`spark.sql.parquet.mergeSchema` | 当为真时Parquet数据源将从所有数据文件收集的Schema合并在一起否则将从摘要文件中选择Schema如果没有可用的摘要文件则从随机数据文件中选择Schema。 |
> 完整的配置列表可以参阅官方文档https://spark.apache.org/docs/latest/sql-data-sources-parquet.html > 更多可选配置可以参阅官方文档https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
<br/>
## 五、ORC ## 五、ORC
ORC是一种自描述的、类型感知的列文件格式它针对大型数据的读进行了优化,也是大数据中常用的文件格式。 ORC是一种自描述的、类型感知的列文件格式它针对大型数据的读进行了优化,也是大数据中常用的文件格式。
### 5.1 读取ORC文件 ### 5.1 读取ORC文件
```scala ```scala
spark.read.format("orc").load("/data/flight-data/orc/2010-summary.orc").show(5) spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
``` ```
### 4.2 写入ORC文件 ### 4.2 写入ORC文件
```scala ```scala
csvFile.write.format("orc").mode("overwrite").save("/tmp/my-json-file.orc") csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
``` ```
<br/>
## 六、SQL Databases ## 六、SQL Databases
spark同样支持与传统的关系型数据库进行数据读写。 spark同样支持与传统的关系型数据库进行数据读写。但是Spark程序默认是没有提供数据库驱动的所以在使用前需要将对应的数据库驱动上传到安装目录下的`jars`目录中。下面示例使用的是Mysql数据库使用前需要将对应的`mysql-connector-java-x.x.x.jar`上传到jars目录下。
### 6.1 获取数据库连接 > 为方便大家测试,我在本仓库的[resources](https://github.com/heibaiying/BigData-Notes/tree/master/resources)目录下上传了mysql-connector-java-5.1.47.jar
### 6.1 读取数据
读取全表数据示例如下,这里的`help_keyword`是mysql内置的字典表只有`help_keyword_id``name`两个字段。
```scala ```scala
import java.sql.DriverManager spark.read
val connection = DriverManager.getConnection(url)
connection.isClosed()
connection.close()
```
### 6.2 读取数据
读取全表数据:
```scala
val pgDF = spark.read
.format("jdbc") .format("jdbc")
.option("driver", "org.postgresql.Driver") .option("driver", "com.mysql.jdbc.Driver") //驱动
.option("url", "jdbc:postgresql://database_server") .option("url", "jdbc:mysql://127.0.0.1:3306/mysql") //数据库地址
.option("dbtable", "schema.tablename") .option("dbtable", "help_keyword") //表名
.option("user", "username").option("password","my-secret-password").load() .option("user", "root").option("password","root").load().show(10)
``` ```
读取过滤后的数据: 从查询结果读取数据:
```scala ```scala
val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
AS flight_info""" spark.read.format("jdbc")
val dbDataFrame = spark.read.format("jdbc") .option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver) .option("driver", "com.mysql.jdbc.Driver")
.load() .option("user", "root").option("password", "root")
.option("dbtable", pushDownQuery)
.load().show()
//输出
+---------------+-----------+
|help_keyword_id| name|
+---------------+-----------+
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 10| ALTER|
| 11| ANALYSE|
| 12| ANALYZE|
| 13| AND|
| 14| ARCHIVE|
| 15| AREA|
| 16| AS|
| 17| ASBINARY|
| 18| ASC|
| 19| ASTEXT|
+---------------+-----------+
``` ```
也可以使用如下的写法进行数据的过滤:
```scala ```scala
val props = new java.util.Properties val props = new java.util.Properties
props.setProperty("driver", "org.sqlite.JDBC") props.setProperty("driver", "com.mysql.jdbc.Driver")
val predicates = Array( props.setProperty("user", "root")
"DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'", props.setProperty("password", "root")
"DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'") val predicates = Array("help_keyword_id < 10 OR name = 'WHEN'") //指定数据过滤条件
spark.read.jdbc(url, tablename, predicates, props).count() // 510 spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql", "help_keyword", predicates, props).show()
//输出:
+---------------+-----------+
|help_keyword_id| name|
+---------------+-----------+
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 604| WHEN|
+---------------+-----------+
``` ```
并行读取数据: 可以使用`numPartitions`指定读取数据的并行度
```scala ```scala
val dbDataFrame = spark.read.format("jdbc") option("numPartitions", 10)
.option("url", url).option("dbtable", tablename).option("driver", driver)
.option("numPartitions", 10).load()
``` ```
在这里,我们对第一个分区和最后一个分区分别指定了最小值和最大值。任何超出这些界限的都在第一个分区或最后一个分区中。然后,我们设置希望的分区总数(这是并行度的级别)。 在这里,除了可以指定分区外,还可以设置上界和下界,任何小于下界的值都会被分配在第一个分区中,任何大于上界的值都会被分配在最后一个分区中
```scala ```scala
val colName = "count" val colName = "help_keyword_id" //用于判断上下界的列
val lowerBound = 0L val lowerBound = 300L //下界
val upperBound = 348113L // this is the max count in our database val upperBound = 500L //上界
val numPartitions = 10 val numPartitions = 10 //分区综述
spark.read.jdbc(url,tablename,colName,lowerBound,upperBound,numPartitions,props) val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword",
.count() // 255 colName,lowerBound,upperBound,numPartitions,props)
``` ```
想要验证分区内容,可以使用`mapPartitionsWithIndex`这个算子,代码如下:
### 6.3 写入数据
```scala ```scala
val newPath = "jdbc:sqlite://tmp/my-sqlite.db" jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => {
csvFile.write.mode("overwrite").jdbc(newPath, tablename, props) val buffer = new ListBuffer[String]
while (iterator.hasNext) {
buffer.append(index + "分区:" + iterator.next())
}
buffer.toIterator
}).foreach(println)
``` ```
执行结果如下:`help_keyword`这张表只有600条左右的数据本来数据应该均匀分布在10个分区但是0分区里面却有319条数据这是因为设置了下限所有小于300的数据都会被限制在第一个分区即0分区。同理所有大于500的数据被分配在9分区即最后一个分区。
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-mysql-分区上下限.png"/> </div>
### 6.2 写入数据
```scala
val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("user", "root").option("password", "root")
.option("dbtable", "emp")
.save()
```
<br/>
## 七、Text ## 七、Text
text文件在读写性能方面并没有任何优势且不能表达明确的数据结构所以其使用的比较少读写操作如下
### 7.1 读取Text数据 ### 7.1 读取Text数据
```scala ```scala
spark.read.textFile("/data/flight-data/csv/2010-summary.csv") spark.read.textFile("/usr/file/txt/dept.txt").show()
.selectExpr("split(value, ',') as rows").show()
``` ```
### 7.2 写入Text数据 ### 7.2 写入Text数据
```scala ```scala
csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text-file.txt") df.write.text("/tmp/spark/txt/dept")
``` ```
## 八、数据读写高级概念 <br/>
## 八、数据读写高级特性
### 8.1 并行读 ### 8.1 并行读
多个executors不能同时读取同一个文件但它们可以同时读取不同的文件。一般来说,这意味着当您从一个包含多个文件的文件夹中读取数据时这些文件中的每一个都将成为DataFrame中的一个分区并由可用的executors并行读取。 多个executors不能同时读取同一个文件但它们可以同时读取不同的文件。这意味着当您从一个包含多个文件的文件夹中读取数据时这些文件中的每一个都将成为DataFrame中的一个分区并由可用的executors并行读取。
### 8.2 并行写 ### 8.2 并行写
@ -283,35 +399,42 @@ csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text-file.txt")
### 8.3 分区写入 ### 8.3 分区写入
分区和分桶这两个概念和Hive中分区表和分桶表是一致的。都是将数据按照一定规则进行拆分存储。需要注意的是`partitionBy`指定的分区和RDD中分区不是一个概念这里的**分区表现为输出目录的子目录**,数据分别存储在对应的子目录中。
```scala ```scala
csvFile.limit(10).write.mode("overwrite").partitionBy("DEST_COUNTRY_NAME") val df = spark.read.format("json").load("/usr/file/json/emp.json")
.save("/tmp/partitioned-files.parquet") df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")
``` ```
输出结果如下:可以看到输出被按照部门编号分为三个子目录,子目录中才是对应的输出文件。
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-分区.png"/> </div>
### 8.3 分桶写入 ### 8.3 分桶写入
分桶写入就是将数据按照指定的列和桶数进行散列目前分桶写入只支持保存为表实际上这就是Hive的分桶表。
```scala ```scala
val numberBuckets = 10 val numberBuckets = 10
val columnToBucketBy = "count" val columnToBucketBy = "empno"
csvFile.write.format("parquet").mode("overwrite").bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles") df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
``` ```
### 8.5 文件大小管理 ### 8.5 文件大小管理
如果写入产生大量小文件这时会产生大量的元数据开销。Spark和HDFS一样都不能很好的处理这个问题这被称为“small file problem”。同时数据文件也不能过大否则在查询时会有不必要的性能开销因此要把文件大小控制在一个合理的范围内。 如果写入产生小文件数量过多这时会产生大量的元数据开销。Spark和HDFS一样都不能很好的处理这个问题这被称为“small file problem”。同时数据文件也不能过大否则在查询时会有不必要的性能开销因此要把文件大小控制在一个合理的范围内。
在上文我们已经介绍过可以通过分区数量来控制生成文件的数量从而间接控制文件大小。Spark 2.2引入了一种新的方法,以更自动化的方式控制文件大小,这就是`maxRecordsPerFile`参数,它允许你通过控制写入文件的记录数来控制文件大小。 在上文我们已经介绍过可以通过分区数量来控制生成文件的数量从而间接控制文件大小。Spark 2.2引入了一种新的方法,以更自动化的方式控制文件大小,这就是`maxRecordsPerFile`参数,它允许你通过控制写入文件的记录数来控制文件大小。
```scala ```scala
// Spark将确保文件最多包含5000条记录 // Spark将确保文件最多包含5000条记录
df.write.option(“maxRecordsPerFile”, 5000) df.write.option(“maxRecordsPerFile”, 5000)
``` ```
<br>
## 九、可选配置附录
## 九、附录
### 9.1 CSV读写可选配置 ### 9.1 CSV读写可选配置
@ -368,13 +491,12 @@ csvFile.write.format("parquet").mode("overwrite").bucketBy(numberBuckets, column
| createTableOptions | 写入数据时自定义创建表的相关配置 | | createTableOptions | 写入数据时自定义创建表的相关配置 |
| createTableColumnTypes | 写入数据时自定义创建列的列类型 | | createTableColumnTypes | 写入数据时自定义创建列的列类型 |
> 完整的配置可以参阅官方文档https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html > 数据库读写更多配置可以参阅官方文档https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
## 参考资料 ## 参考资料
1. Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02 1. Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02
2. https://spark.apache.org/docs/latest/sql-data-sources.html

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

BIN
pictures/spark-分区.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

4
resources/csv/dept.csv Normal file
View File

@ -0,0 +1,4 @@
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
1 10 ACCOUNTING NEW YORK
2 20 RESEARCH DALLAS
3 30 SALES CHICAGO
4 40 OPERATIONS BOSTON

BIN
resources/orc/dept.orc Normal file

Binary file not shown.