diff --git a/notes/SparkSQL外部数据源.md b/notes/SparkSQL外部数据源.md index f7939ff..fe1d333 100644 --- a/notes/SparkSQL外部数据源.md +++ b/notes/SparkSQL外部数据源.md @@ -1,5 +1,43 @@ # Spark SQL 外部数据源 + + ## 一、简介 ### 1.1 多数据源支持 @@ -11,11 +49,13 @@ Spark支持以下六个核心数据源,同时Spark社区还提供了多达上 - Parquet - ORC - JDBC/ODBC connections -- Plain-text files +- Plain-text files + +> 注:以下所有测试文件均可从本仓库的[resources](https://github.com/heibaiying/BigData-Notes/tree/master/resources)目录进行下载 ### 1.2 读数据格式 -所有数据源读取API都遵循以下调用格式: +所有读取API遵循以下调用格式: ```scala // 格式 @@ -47,20 +87,22 @@ DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy( //示例 dataframe.write.format("csv") -.option("mode", "OVERWRITE") //写模式 -.option("dateFormat", "yyyy-MM-dd") +.option("mode", "OVERWRITE") //写模式 +.option("dateFormat", "yyyy-MM-dd") //日期格式 .option("path", "path/to/file(s)") .save() ``` -| Scala/Java | 描述 | -| :--------------------------------- | :----------------------------------------------------------- | -| `SaveMode.ErrorIfExists`(默认模式) | 如果给定的路径已经存在文件,则抛出异常,这是写数据默认的模式 | -| `SaveMode.Append` | 数据以追加的方式写入 | -| `SaveMode.Overwrite` | 数据以覆盖的方式写入 | -| `SaveMode.Ignore` | Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected not to save the contents of the DataFrame and not to change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL. | +写数据模式有以下四种可选项: +| Scala/Java | 描述 | +| :----------------------- | :----------------------------------------------------------- | +| `SaveMode.ErrorIfExists` | 如果给定的路径已经存在文件,则抛出异常,这是写数据默认的模式 | +| `SaveMode.Append` | 数据以追加的方式写入 | +| `SaveMode.Overwrite` | 数据以覆盖的方式写入 | +| `SaveMode.Ignore` | 如果给定的路径已经存在文件,则不做任何操作 | +
## 二、CSV @@ -68,74 +110,86 @@ CSV是一种常见的文本文件格式,其中每一行表示一条记录, ### 2.1 读取CSV文件 -自动推断类型: +自动推断类型读取读取示例: ```scala spark.read.format("csv") -.option("header", "true") -.option("mode", "FAILFAST") -.option("inferSchema", "true") -.load("some/path/to/file.csv") +.option("header", "false") // 文件中的第一行是否为列的名称 +.option("mode", "FAILFAST") // 是否快速失败 +.option("inferSchema", "true") // 是否自动推断schema +.load("/usr/file/csv/dept.csv") +.show() ``` 使用预定义类型: ```scala -import org.apache.spark.sql.types.{StructField, StructType, StringType, -LongType} +import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType} +//预定义数据格式 val myManualSchema = new StructType(Array( -new StructField("DEST_COUNTRY_NAME", StringType, true), -new StructField("ORIGIN_COUNTRY_NAME", StringType, true), -new StructField("count", LongType, false) + StructField("deptno", LongType, nullable = false), + StructField("dname", StringType,nullable = true), + StructField("loc", StringType,nullable = true) )) spark.read.format("csv") -.option("header", "true") .option("mode", "FAILFAST") .schema(myManualSchema) -.load("/data/flight-data/csv/2010-summary.csv") -.show(5) +.load("/usr/file/csv/dept.csv") +.show() ``` ### 2.2 写入CSV文件 ```scala -import org.apache.spark.sql.types.{StructField, StructType, StringType, -LongType} -val myManualSchema = new StructType(Array( -new StructField("DEST_COUNTRY_NAME", StringType, true), -new StructField("ORIGIN_COUNTRY_NAME", StringType, true), -new StructField("count", LongType, false) -)) -spark.read.format("csv") -.option("header", "true") -.option("mode", "FAILFAST") -.schema(myManualSchema) -.load("/data/flight-data/csv/2010-summary.csv") -.show(5) +df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2") ``` -将csv文件,转换为tsv文件: +也可以指定具体的分隔符: ```scala -csvFile.write.format("csv").mode("overwrite").option("sep", "\t") -.save("/tmp/my-tsv-file.tsv") +df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2") ``` +### 2.3 可选配置 + +为节省主文篇幅,所有读写配置项见文末9.1小节。 + +
+ ## 三、JSON ### 3.1 读取JSON文件 ```json -spark.read.format("json").option("mode", "FAILFAST").schema(myManualSchema) -.load("/data/flight-data/json/2010-summary.json").show(5) +spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5) +``` + +需要注意的是:默认不支持一条数据记录跨越多行(如下),可以通过配置`multiLine`为`true`来进行更改,其默认值为`false`。 + +```json +// 默认支持单行 +{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"} + +//默认不支持多行 +{ + "DEPTNO": 10, + "DNAME": "ACCOUNTING", + "LOC": "NEW YORK" +} ``` ### 3.2 写入JSON文件 ```scala -csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json") +df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept") ``` +### 3.3 可选配置 + +为节省主文篇幅,所有读写配置项见文末9.2小节。 + +
+ ## 四、Parquet Parquet是一个开源的面向列的数据存储,它提供了多种存储优化,允许读取单独的列非整个文件,这不仅节省了存储空间而且提升了读取效率,它是Spark是默认的文件格式。 @@ -143,17 +197,13 @@ csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json") ### 4.1 读取Parquet文件 ```scala -spark.read.format("parquet") -.load("/data/flight-data/parquet/2010-summary.parquet").show(5) +spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5) ``` -需要注意的是:默认不支持一条数据记录跨越多行,可以通过配置`multiLine`为`true`来进行更改,其默认值为`false`。 - ### 2.2 写入Parquet文件 ```scala -csvFile.write.format("parquet").mode("overwrite") -.save("/tmp/my-parquet-file.parquet") +df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept") ``` ### 2.3 可选配置 @@ -165,117 +215,183 @@ Parquet文件有着自己的存储规则,因此其可选配置项比较少, | 读 | compression or codec | None,
uncompressed,
bzip2,
deflate, gzip,
lz4, or snappy | None | 压缩文件格式 | | Read | mergeSchema | true, false | 取决于配置项`spark.sql.parquet.mergeSchema` | 当为真时,Parquet数据源将从所有数据文件收集的Schema合并在一起,否则将从摘要文件中选择Schema,如果没有可用的摘要文件,则从随机数据文件中选择Schema。 | -> 完整的配置列表可以参阅官方文档:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html +> 更多可选配置可以参阅官方文档:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html + +
## 五、ORC -ORC是一种自描述的、类型感知的列文件格式,它针对大型数据的读取进行了优化,也是大数据中常用的文件格式。 +ORC是一种自描述的、类型感知的列文件格式,它针对大型数据的读写进行了优化,也是大数据中常用的文件格式。 ### 5.1 读取ORC文件 ```scala -spark.read.format("orc").load("/data/flight-data/orc/2010-summary.orc").show(5) +spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5) ``` ### 4.2 写入ORC文件 ```scala -csvFile.write.format("orc").mode("overwrite").save("/tmp/my-json-file.orc") +csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept") ``` +
+ ## 六、SQL Databases -spark同样支持与传统的关系型数据库进行数据读写。 +spark同样支持与传统的关系型数据库进行数据读写。但是Spark程序默认是没有提供数据库驱动的,所以在使用前需要将对应的数据库驱动上传到安装目录下的`jars`目录中。下面示例使用的是Mysql数据库,使用前需要将对应的`mysql-connector-java-x.x.x.jar`上传到jars目录下。 -### 6.1 获取数据库连接 +> 为方便大家测试,我在本仓库的[resources](https://github.com/heibaiying/BigData-Notes/tree/master/resources)目录下上传了mysql-connector-java-5.1.47.jar + +### 6.1 读取数据 + +读取全表数据示例如下,这里的`help_keyword`是mysql内置的字典表,只有`help_keyword_id`和`name`两个字段。 ```scala -import java.sql.DriverManager -val connection = DriverManager.getConnection(url) -connection.isClosed() -connection.close() -``` - -### 6.2 读取数据 - -读取全表数据: - -```scala -val pgDF = spark.read +spark.read .format("jdbc") -.option("driver", "org.postgresql.Driver") -.option("url", "jdbc:postgresql://database_server") -.option("dbtable", "schema.tablename") -.option("user", "username").option("password","my-secret-password").load() +.option("driver", "com.mysql.jdbc.Driver") //驱动 +.option("url", "jdbc:mysql://127.0.0.1:3306/mysql") //数据库地址 +.option("dbtable", "help_keyword") //表名 +.option("user", "root").option("password","root").load().show(10) ``` -读取过滤后的数据: +从查询结果读取数据: ```scala -val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) -AS flight_info""" -val dbDataFrame = spark.read.format("jdbc") -.option("url", url).option("dbtable", pushdownQuery).option("driver", driver) -.load() +val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords""" +spark.read.format("jdbc") +.option("url", "jdbc:mysql://127.0.0.1:3306/mysql") +.option("driver", "com.mysql.jdbc.Driver") +.option("user", "root").option("password", "root") +.option("dbtable", pushDownQuery) +.load().show() + +//输出 ++---------------+-----------+ +|help_keyword_id| name| ++---------------+-----------+ +| 0| <>| +| 1| ACTION| +| 2| ADD| +| 3|AES_DECRYPT| +| 4|AES_ENCRYPT| +| 5| AFTER| +| 6| AGAINST| +| 7| AGGREGATE| +| 8| ALGORITHM| +| 9| ALL| +| 10| ALTER| +| 11| ANALYSE| +| 12| ANALYZE| +| 13| AND| +| 14| ARCHIVE| +| 15| AREA| +| 16| AS| +| 17| ASBINARY| +| 18| ASC| +| 19| ASTEXT| ++---------------+-----------+ ``` +也可以使用如下的写法进行数据的过滤: + ```scala val props = new java.util.Properties -props.setProperty("driver", "org.sqlite.JDBC") -val predicates = Array( -"DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'", -"DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'") -spark.read.jdbc(url, tablename, predicates, props).count() // 510 +props.setProperty("driver", "com.mysql.jdbc.Driver") +props.setProperty("user", "root") +props.setProperty("password", "root") +val predicates = Array("help_keyword_id < 10 OR name = 'WHEN'") //指定数据过滤条件 +spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql", "help_keyword", predicates, props).show() + +//输出: ++---------------+-----------+ +|help_keyword_id| name| ++---------------+-----------+ +| 0| <>| +| 1| ACTION| +| 2| ADD| +| 3|AES_DECRYPT| +| 4|AES_ENCRYPT| +| 5| AFTER| +| 6| AGAINST| +| 7| AGGREGATE| +| 8| ALGORITHM| +| 9| ALL| +| 604| WHEN| ++---------------+-----------+ ``` -并行读取数据: +可以使用`numPartitions`指定读取数据的并行度: ```scala -val dbDataFrame = spark.read.format("jdbc") -.option("url", url).option("dbtable", tablename).option("driver", driver) -.option("numPartitions", 10).load() +option("numPartitions", 10) ``` -在这里,我们对第一个分区和最后一个分区分别指定了最小值和最大值。任何超出这些界限的都在第一个分区或最后一个分区中。然后,我们设置希望的分区总数(这是并行度的级别)。 +在这里,除了可以指定分区外,还可以设置上界和下界,任何小于下界的值都会被分配在第一个分区中,任何大于上界的值都会被分配在最后一个分区中。 ```scala -val colName = "count" -val lowerBound = 0L -val upperBound = 348113L // this is the max count in our database -val numPartitions = 10 -spark.read.jdbc(url,tablename,colName,lowerBound,upperBound,numPartitions,props) -.count() // 255 +val colName = "help_keyword_id" //用于判断上下界的列 +val lowerBound = 300L //下界 +val upperBound = 500L //上界 +val numPartitions = 10 //分区综述 +val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword", + colName,lowerBound,upperBound,numPartitions,props) ``` - - -### 6.3 写入数据 +想要验证分区内容,可以使用`mapPartitionsWithIndex`这个算子,代码如下: ```scala -val newPath = "jdbc:sqlite://tmp/my-sqlite.db" -csvFile.write.mode("overwrite").jdbc(newPath, tablename, props) +jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => { + val buffer = new ListBuffer[String] + while (iterator.hasNext) { + buffer.append(index + "分区:" + iterator.next()) + } + buffer.toIterator +}).foreach(println) ``` +执行结果如下:`help_keyword`这张表只有600条左右的数据,本来数据应该均匀分布在10个分区,但是0分区里面却有319条数据,这是因为设置了下限,所有小于300的数据都会被限制在第一个分区,即0分区。同理所有大于500的数据被分配在9分区,即最后一个分区。 + +
+ +### 6.2 写入数据 + +```scala +val df = spark.read.format("json").load("/usr/file/json/emp.json") +df.write +.format("jdbc") +.option("url", "jdbc:mysql://127.0.0.1:3306/mysql") +.option("user", "root").option("password", "root") +.option("dbtable", "emp") +.save() +``` + +
+ ## 七、Text +text文件在读写性能方面并没有任何优势,且不能表达明确的数据结构,所以其使用的比较少,读写操作如下: + ### 7.1 读取Text数据 ```scala -spark.read.textFile("/data/flight-data/csv/2010-summary.csv") -.selectExpr("split(value, ',') as rows").show() +spark.read.textFile("/usr/file/txt/dept.txt").show() ``` ### 7.2 写入Text数据 ```scala -csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text-file.txt") +df.write.text("/tmp/spark/txt/dept") ``` -## 八、数据读写高级概念 +
+ +## 八、数据读写高级特性 ### 8.1 并行读 -多个executors不能同时读取同一个文件,但它们可以同时读取不同的文件。一般来说,这意味着当您从一个包含多个文件的文件夹中读取数据时,这些文件中的每一个都将成为DataFrame中的一个分区,并由可用的executors并行读取。 +多个executors不能同时读取同一个文件,但它们可以同时读取不同的文件。这意味着当您从一个包含多个文件的文件夹中读取数据时,这些文件中的每一个都将成为DataFrame中的一个分区,并由可用的executors并行读取。 ### 8.2 并行写 @@ -283,35 +399,42 @@ csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text-file.txt") ### 8.3 分区写入 +分区和分桶这两个概念和Hive中分区表和分桶表是一致的。都是将数据按照一定规则进行拆分存储。需要注意的是`partitionBy`指定的分区和RDD中分区不是一个概念:这里的**分区表现为输出目录的子目录**,数据分别存储在对应的子目录中。 + ```scala -csvFile.limit(10).write.mode("overwrite").partitionBy("DEST_COUNTRY_NAME") -.save("/tmp/partitioned-files.parquet") +val df = spark.read.format("json").load("/usr/file/json/emp.json") +df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions") ``` +输出结果如下:可以看到输出被按照部门编号分为三个子目录,子目录中才是对应的输出文件。 + +
+ ### 8.3 分桶写入 +分桶写入就是将数据按照指定的列和桶数进行散列,目前分桶写入只支持保存为表,实际上这就是Hive的分桶表。 + ```scala val numberBuckets = 10 -val columnToBucketBy = "count" -csvFile.write.format("parquet").mode("overwrite").bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles") +val columnToBucketBy = "empno" +df.write.format("parquet").mode("overwrite") +.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles") ``` ### 8.5 文件大小管理 -如果写入产生大量小文件,这时会产生大量的元数据开销。Spark和HDFS一样,都不能很好的处理这个问题,这被称为“small file problem”。同时数据文件也不能过大,否则在查询时会有不必要的性能开销,因此要把文件大小控制在一个合理的范围内。 +如果写入产生小文件数量过多,这时会产生大量的元数据开销。Spark和HDFS一样,都不能很好的处理这个问题,这被称为“small file problem”。同时数据文件也不能过大,否则在查询时会有不必要的性能开销,因此要把文件大小控制在一个合理的范围内。 在上文我们已经介绍过可以通过分区数量来控制生成文件的数量,从而间接控制文件大小。Spark 2.2引入了一种新的方法,以更自动化的方式控制文件大小,这就是`maxRecordsPerFile`参数,它允许你通过控制写入文件的记录数来控制文件大小。 ```scala // Spark将确保文件最多包含5000条记录 - df.write.option(“maxRecordsPerFile”, 5000) +df.write.option(“maxRecordsPerFile”, 5000) ``` +
- - - -## 九、附录 +## 九、可选配置附录 ### 9.1 CSV读写可选配置 @@ -368,13 +491,12 @@ csvFile.write.format("parquet").mode("overwrite").bucketBy(numberBuckets, column | createTableOptions | 写入数据时自定义创建表的相关配置 | | createTableColumnTypes | 写入数据时自定义创建列的列类型 | -> 完整的配置可以参阅官方文档:https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html - - +> 数据库读写更多配置可以参阅官方文档:https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html ## 参考资料 1. Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02 +2. https://spark.apache.org/docs/latest/sql-data-sources.html diff --git a/pictures/spark-mysql-分区上下限.png b/pictures/spark-mysql-分区上下限.png new file mode 100644 index 0000000..d93c611 Binary files /dev/null and b/pictures/spark-mysql-分区上下限.png differ diff --git a/pictures/spark-分区.png b/pictures/spark-分区.png new file mode 100644 index 0000000..f0e9075 Binary files /dev/null and b/pictures/spark-分区.png differ diff --git a/resources/csv/dept.csv b/resources/csv/dept.csv new file mode 100644 index 0000000..2a36e93 --- /dev/null +++ b/resources/csv/dept.csv @@ -0,0 +1,4 @@ +10,ACCOUNTING,NEW YORK +20,RESEARCH,DALLAS +30,SALES,CHICAGO +40,OPERATIONS,BOSTON diff --git a/resources/orc/dept.orc b/resources/orc/dept.orc new file mode 100644 index 0000000..3bb8d77 Binary files /dev/null and b/resources/orc/dept.orc differ diff --git a/resources/parquet/dept.parquet/part-00000-9dcc37bc-f4e8-4c2f-b2f4-b780aecfa9ad-c000.snappy.parquet b/resources/parquet/dept.parquet similarity index 100% rename from resources/parquet/dept.parquet/part-00000-9dcc37bc-f4e8-4c2f-b2f4-b780aecfa9ad-c000.snappy.parquet rename to resources/parquet/dept.parquet diff --git a/resources/parquet/dept.parquet/._SUCCESS.crc b/resources/parquet/dept.parquet/._SUCCESS.crc deleted file mode 100644 index 3b7b044..0000000 Binary files a/resources/parquet/dept.parquet/._SUCCESS.crc and /dev/null differ diff --git a/resources/parquet/dept.parquet/.part-00000-9dcc37bc-f4e8-4c2f-b2f4-b780aecfa9ad-c000.snappy.parquet.crc b/resources/parquet/dept.parquet/.part-00000-9dcc37bc-f4e8-4c2f-b2f4-b780aecfa9ad-c000.snappy.parquet.crc deleted file mode 100644 index 78cadc3..0000000 Binary files a/resources/parquet/dept.parquet/.part-00000-9dcc37bc-f4e8-4c2f-b2f4-b780aecfa9ad-c000.snappy.parquet.crc and /dev/null differ diff --git a/resources/parquet/dept.parquet/_SUCCESS b/resources/parquet/dept.parquet/_SUCCESS deleted file mode 100644 index e69de29..0000000 diff --git a/resources/parquet/emp.parquet/part-00000-d2881f1b-46f7-44d4-b8bc-0a768ae2ad66-c000.snappy.parquet b/resources/parquet/emp.parquet similarity index 100% rename from resources/parquet/emp.parquet/part-00000-d2881f1b-46f7-44d4-b8bc-0a768ae2ad66-c000.snappy.parquet rename to resources/parquet/emp.parquet diff --git a/resources/parquet/emp.parquet/._SUCCESS.crc b/resources/parquet/emp.parquet/._SUCCESS.crc deleted file mode 100644 index 3b7b044..0000000 Binary files a/resources/parquet/emp.parquet/._SUCCESS.crc and /dev/null differ diff --git a/resources/parquet/emp.parquet/.part-00000-d2881f1b-46f7-44d4-b8bc-0a768ae2ad66-c000.snappy.parquet.crc b/resources/parquet/emp.parquet/.part-00000-d2881f1b-46f7-44d4-b8bc-0a768ae2ad66-c000.snappy.parquet.crc deleted file mode 100644 index 2703216..0000000 Binary files a/resources/parquet/emp.parquet/.part-00000-d2881f1b-46f7-44d4-b8bc-0a768ae2ad66-c000.snappy.parquet.crc and /dev/null differ diff --git a/resources/parquet/emp.parquet/_SUCCESS b/resources/parquet/emp.parquet/_SUCCESS deleted file mode 100644 index e69de29..0000000