spark streaming 基本操作
This commit is contained in:
		| @@ -90,7 +90,7 @@ | |||||||
| **Spark Streaming :** | **Spark Streaming :** | ||||||
|  |  | ||||||
| 1. [Spark Streaming 简介](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming与流处理.md) | 1. [Spark Streaming 简介](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming与流处理.md) | ||||||
| 2. DStream常用操作详解 | 2. [Spark Streaming 基本操作](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming基本操作.md) | ||||||
| 3. [Spark Streaming 整合 Flume](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Flume.md) | 3. [Spark Streaming 整合 Flume](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Flume.md) | ||||||
| 4. [Spark Streaming 整合 Kafka](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Kafka.md) | 4. [Spark Streaming 整合 Kafka](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Kafka.md) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,8 +1,24 @@ | |||||||
| # Spark Streaming 基本操作 | # Spark Streaming 基本操作 | ||||||
|  |  | ||||||
|  | <nav> | ||||||
|  | <a href="#一案例引入">一、案例引入</a><br/> | ||||||
|  |         <a href="#31-StreamingContext">3.1 StreamingContext</a><br/> | ||||||
|  |         <a href="#32-数据源">3.2 数据源</a><br/> | ||||||
|  |         <a href="#33-服务的启动与停止">3.3 服务的启动与停止</a><br/> | ||||||
|  | <a href="#二Transformation">二、Transformation</a><br/> | ||||||
|  |         <a href="#21-DStream与RDDs">2.1 DStream与RDDs</a><br/> | ||||||
|  |         <a href="#22-updateStateByKey">2.2 updateStateByKey</a><br/> | ||||||
|  |         <a href="#23-启动测试">2.3 启动测试</a><br/> | ||||||
|  | <a href="#三输出操作">三、输出操作</a><br/> | ||||||
|  |         <a href="#31-输出API">3.1 输出API</a><br/> | ||||||
|  |         <a href="#31-foreachRDD">3.1 foreachRDD</a><br/> | ||||||
|  |         <a href="#33-代码说明">3.3 代码说明</a><br/> | ||||||
|  |         <a href="#34-启动测试">3.4 启动测试</a><br/> | ||||||
|  | </nav> | ||||||
|  |  | ||||||
| ## 一、案例引入 | ## 一、案例引入 | ||||||
|  |  | ||||||
| 这里先引入一个基本的案例来演示流的创建:监听指定端口9999上的数据并进行词频统计。项目依赖和代码实现如下: | 这里先引入一个基本的案例来演示流的创建:获取指定端口上的数据并进行词频统计。项目依赖和代码实现如下: | ||||||
|  |  | ||||||
| ```xml | ```xml | ||||||
| <dependency> | <dependency> | ||||||
| @@ -36,8 +52,6 @@ object NetworkWordCount { | |||||||
| } | } | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
| 二、常用算子 |  | ||||||
|  |  | ||||||
| 使用本地模式启动Spark程序,然后使用`nc -lk 9999`打开端口并输入测试数据: | 使用本地模式启动Spark程序,然后使用`nc -lk 9999`打开端口并输入测试数据: | ||||||
|  |  | ||||||
| ```shell | ```shell | ||||||
| @@ -46,24 +60,24 @@ hello world hello spark hive hive hadoop | |||||||
| storm storm flink azkaban | storm storm flink azkaban | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
| 此时IDEA中控制台输出如下,可以看到已经接收到每一行数据并且进行了词频统计。 | 此时控制台输出如下,可以看到已经接收到数据并按行进行了词频统计。 | ||||||
|  |  | ||||||
|  | <div align="center"> <img  src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-streaming-word-count-v1.png"/> </div> | ||||||
|  |  | ||||||
| 下面我们针对示例代码进行讲解: | 下面针对示例代码进行讲解: | ||||||
|  |  | ||||||
| ### 3.1 StreamingContext | ### 3.1 StreamingContext | ||||||
|  |  | ||||||
| Spark Streaming编程的入口类是StreamingContext,在创建时候需要指明`sparkConf`和`batchDuration`(批次时间),Spark流处理本质是将流数据拆分为一个个批次,然后进行微批处理,batchDuration就是用于指定流数据将被分成批次的时间间隔。这个时间可以根据业务需求和服务器性能进行指定,如果业务要求低延迟,则这个时间可以指定得很短。 | Spark Streaming编程的入口类是StreamingContext,在创建时候需要指明`sparkConf`和`batchDuration`(批次时间),Spark流处理本质是将流数据拆分为一个个批次,然后进行微批处理,batchDuration就是用于指定将流数据拆成批次的时间间隔。这个时间可以根据业务需求和服务器性能进行指定,如果业务要求低延迟并且服务器性能也允许,则这个时间可以指定得很短。 | ||||||
|  |  | ||||||
| 这里需要注意的是:示例代码使用的是本地模式,配置为`local[2]`,这里不能配置为`local[1]`。这是因为对于流数据的处理,Spark必须有一个独立的Executor来接收数据,然后由其他的Executors来处理数据,所以为了保证数据能够被处理,至少要有2个Executors。这里我们的程序只有一个数据流,在并行读取多个数据流的时候,也要注意必须保证有足够的Executors来接收和处理数据。 | 这里需要注意的是:示例代码使用的是本地模式,配置为`local[2]`,这里不能配置为`local[1]`。这是因为对于流数据的处理,Spark必须有一个独立的Executor来接收数据,然后再由其他的Executors来处理,所以为了保证数据能够被处理,至少要有2个Executors。这里我们的程序只有一个数据流,在并行读取多个数据流的时候,也需要保证有足够的Executors来接收和处理数据。 | ||||||
|  |  | ||||||
| ### 3.2 数据源 | ### 3.2 数据源 | ||||||
|  |  | ||||||
| 在示例代码中使用的是`socketTextStream`来创建基于socket的数据流,实际上Spark还支持多种数据源,分为以下两类: | 在示例代码中使用的是`socketTextStream`来创建基于socket的数据流,实际上Spark还支持多种数据源,分为以下两类: | ||||||
|  |  | ||||||
| + 基本数据源:文件系统、socket的连接; | + 基本数据源:包括文件系统、socket连接等; | ||||||
| + 高级数据源:Kafka,Flume,Kinesis。 | + 高级数据源:包括Kafka,Flume,Kinesis等。 | ||||||
|  |  | ||||||
| 在基本数据源中,Spark支持对HDFS上指定目录进行监听,当有新文件加入时,会获取其文件内容作为输入流。创建方式如下: | 在基本数据源中,Spark支持对HDFS上指定目录进行监听,当有新文件加入时,会获取其文件内容作为输入流。创建方式如下: | ||||||
|  |  | ||||||
| @@ -84,17 +98,17 @@ streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirector | |||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| ## 二、updateStateByKey | ## 二、Transformation | ||||||
|  |  | ||||||
| ### 2.1 DStream与RDDs | ### 2.1 DStream与RDDs | ||||||
|  |  | ||||||
| DStream是Spark Streaming提供的基本抽象。它表示连续的数据流。在内部,DStream由一系列连续的RDD表示。所以从本质上而言,应用于DStream的任何操作都会转换为底层RDD上的操作。例如,在示例代码中的flatMap算子操作实际上是作用在每个RDDs上(如下图)。因为这个原因,所以Spark Streaming能够支持RDD大多数的transformation算子。 | DStream是Spark Streaming提供的基本抽象。它表示连续的数据流。在内部,DStream由一系列连续的RDD表示。所以从本质上而言,应用于DStream的任何操作都会转换为底层RDD上的操作。例如,在示例代码中flatMap算子的操作实际上是作用在每个RDDs上(如下图)。因为这个原因,所以DStream能够支持RDD的大部分transformation算子。 | ||||||
|  |  | ||||||
|  | <div align="center"> <img  src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-streaming-dstream-ops.png"/> </div> | ||||||
|  |  | ||||||
| ### 2.2 updateStateByKey | ### 2.2 updateStateByKey | ||||||
|  |  | ||||||
| 除了支持RDD上大多数的transformation算子外,DStream还有部分独有的算子,这当中比较重要的是`updateStateByKey`。文章开头的词频统计程序,只能统计每一次输入文本中单词出现的数量,想要统计所有历史输入中单词出现的数量,就需要依赖`updateStateByKey`算子。代码如下: | 除了能够支持RDD的大多数transformation算子外,DStream还有部分独有的算子,这当中比较重要的是`updateStateByKey`。文章开头的词频统计程序,只能统计每一次输入文本中单词出现的数量,想要统计所有历史输入中单词出现的数量,可以使用`updateStateByKey`算子。代码如下: | ||||||
|  |  | ||||||
| ```scala | ```scala | ||||||
| object NetworkWordCountV2 { | object NetworkWordCountV2 { | ||||||
| @@ -140,6 +154,8 @@ object NetworkWordCountV2 { | |||||||
|  |  | ||||||
| ### 2.3 启动测试 | ### 2.3 启动测试 | ||||||
|  |  | ||||||
|  | 在监听端口输入如下测试数据: | ||||||
|  |  | ||||||
| ```shell | ```shell | ||||||
| [root@hadoop001 ~]#  nc -lk 9999 | [root@hadoop001 ~]#  nc -lk 9999 | ||||||
| hello world hello spark hive hive hadoop | hello world hello spark hive hive hadoop | ||||||
| @@ -148,11 +164,11 @@ hello world hello spark hive hive hadoop | |||||||
| storm storm flink azkaban | storm storm flink azkaban | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
| 控制台输出如下,可以看到每一次输入的结果都被进行了累计求值。 | 此时控制台输出如下,所有输入都被进行了词频累计: | ||||||
|  |  | ||||||
|  | <div align="center"> <img  src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-streaming-word-count-v2.png"/> </div> | ||||||
|  |  | ||||||
| 同时查看在输出日志中还可以看到检查点操作的相关信息: | 同时在输出日志中还可以看到检查点操作的相关信息: | ||||||
|  |  | ||||||
| ```shell | ```shell | ||||||
| # 保存检查点信息 | # 保存检查点信息 | ||||||
| @@ -168,6 +184,8 @@ Deleting hdfs://hadoop001:8020/spark-streaming/checkpoint-1558945265000 | |||||||
|  |  | ||||||
| ### 3.1 输出API | ### 3.1 输出API | ||||||
|  |  | ||||||
|  | Spark Streaming支持以下输出操作: | ||||||
|  |  | ||||||
| | Output Operation                            | Meaning                                                      | | | Output Operation                            | Meaning                                                      | | ||||||
| | :------------------------------------------ | :----------------------------------------------------------- | | | :------------------------------------------ | :----------------------------------------------------------- | | ||||||
| | **print**()                                 | 在运行流应用程序的driver节点上打印DStream中每个批次的前十个元素。用于开发调试。 | | | **print**()                                 | 在运行流应用程序的driver节点上打印DStream中每个批次的前十个元素。用于开发调试。 | | ||||||
| @@ -176,10 +194,11 @@ Deleting hdfs://hadoop001:8020/spark-streaming/checkpoint-1558945265000 | |||||||
| | **saveAsHadoopFiles**(*prefix*, [*suffix*]) | 将此DStream的内容保存为Hadoop文件。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS [.suffix]”。 | | | **saveAsHadoopFiles**(*prefix*, [*suffix*]) | 将此DStream的内容保存为Hadoop文件。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS [.suffix]”。 | | ||||||
| | **foreachRDD**(*func*)                      | 最通用的输出方式,它将函数func应用于从流生成的每个RDD。此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库。 | | | **foreachRDD**(*func*)                      | 最通用的输出方式,它将函数func应用于从流生成的每个RDD。此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库。 | | ||||||
|  |  | ||||||
|  | 前面的四个API都是直接调用即可,下面主要讲解通用的输出方式`foreachRDD(func)`,通过该API你可以将数据保存到任何你需要的数据源。 | ||||||
|  |  | ||||||
| ### 3.1 foreachRDD | ### 3.1 foreachRDD | ||||||
|  |  | ||||||
| 这里我们使用Redis作为客户端,对文章开头示例程序进行改变,把每一次词频统计的结果写入到Redis,利用Redis的`HINCRBY`命令来进行总次数的统计。相关依赖和实现代码如下: | 这里我们使用Redis作为客户端,对文章开头示例程序进行改变,把每一次词频统计的结果写入到Redis,并利用Redis的`HINCRBY`命令来进行词频统计。这里需要导入Jedis依赖: | ||||||
|  |  | ||||||
| ```xml | ```xml | ||||||
| <dependency> | <dependency> | ||||||
| @@ -189,7 +208,7 @@ Deleting hdfs://hadoop001:8020/spark-streaming/checkpoint-1558945265000 | |||||||
| </dependency> | </dependency> | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
| 实现代码如下: | 具体实现代码如下: | ||||||
|  |  | ||||||
| ```scala | ```scala | ||||||
| import org.apache.spark.SparkConf | import org.apache.spark.SparkConf | ||||||
| @@ -208,6 +227,7 @@ object NetworkWordCountToRedis { | |||||||
|     val lines = ssc.socketTextStream("hadoop001", 9999) |     val lines = ssc.socketTextStream("hadoop001", 9999) | ||||||
|     val pairs: DStream[(String, Int)] = lines.flatMap(_.split(" ")) |     val pairs: DStream[(String, Int)] = lines.flatMap(_.split(" ")) | ||||||
|       									.map(x => (x, 1)).reduceByKey(_ + _) |       									.map(x => (x, 1)).reduceByKey(_ + _) | ||||||
|  |      /*保存数据到Redis*/ | ||||||
|     pairs.foreachRDD { rdd => |     pairs.foreachRDD { rdd => | ||||||
|       rdd.foreachPartition { partitionOfRecords => |       rdd.foreachPartition { partitionOfRecords => | ||||||
|         var jedis: Jedis = null |         var jedis: Jedis = null | ||||||
| @@ -222,7 +242,6 @@ object NetworkWordCountToRedis { | |||||||
|         } |         } | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     ssc.start() |     ssc.start() | ||||||
|     ssc.awaitTermination() |     ssc.awaitTermination() | ||||||
|   } |   } | ||||||
| @@ -263,7 +282,7 @@ public class JedisPoolUtil { | |||||||
|  |  | ||||||
| ### 3.3 代码说明 | ### 3.3 代码说明 | ||||||
|  |  | ||||||
| 这里将上面输出操作的代码单独抽取出来,并去除异常判断的部分,精简后的代码如下: | 这里将上面保存到Redis的代码单独抽取出来,并去除异常判断的部分。精简后的代码如下: | ||||||
|  |  | ||||||
| ```scala | ```scala | ||||||
| pairs.foreachRDD { rdd => | pairs.foreachRDD { rdd => | ||||||
| @@ -275,7 +294,7 @@ pairs.foreachRDD { rdd => | |||||||
| } | } | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
| 这里可以看到一共使用了三次循环,分别是循环RDD,循环分区,循环每条记录,上面我们的代码是在循环分区的时候获取连接,也就是为每一个分区获取一个连接。但是这里大家可能会有疑问:为什么不在循环RDD的时候,为每一个RDD获取一个连接,这样所需要的连接数更少。实际上这是不可以的,如果按照这种情况进行改写,如下: | 这里可以看到一共使用了三次循环,分别是循环RDD,循环分区,循环每条记录,上面我们的代码是在循环分区的时候获取连接,也就是为每一个分区获取一个连接。但是这里大家可能会有疑问:为什么不在循环RDD的时候,为每一个RDD获取一个连接,这样所需要的连接数可以更少。实际上这是不可行的,如果按照这种情况进行改写,如下: | ||||||
|  |  | ||||||
| ```scala | ```scala | ||||||
| pairs.foreachRDD { rdd => | pairs.foreachRDD { rdd => | ||||||
| @@ -287,13 +306,13 @@ pairs.foreachRDD { rdd => | |||||||
| } | } | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
| 此时在执行时候就会抛出`Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis`,这是因为 | 此时在执行时候就会抛出`Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis`,这是因为在实际计算时,Spark会将对RDD操作分解为多个Task,Task则运行在具体的Worker Node上。在执行之前,Spark会对任务进行闭包,之后闭包被序列化并发送给每个Executor,这同时也是产生累加器和广播变量的原因。正式因为要执行序列化,而`Jedis`显然是不能被序列化的,所以会抛出异常。 | ||||||
|  |  | ||||||
| 第二个需要注意的是ConnectionPool最好是一个静态,惰性初始化连接池 。这是因为Spark的转换操作本生就是惰性的,且没有数据流时是不会触发写出操作,故出于性能考虑,连接池应该是惰性静态的,所以上面`JedisPool`在初始化时采用了懒汉式单例进行初始化。 | 第二个需要注意的是ConnectionPool最好是一个静态,惰性初始化连接池 。这是因为Spark的转换操作本身就是惰性的,且没有数据流时是不会触发写出操作,故出于性能考虑,连接池应该是惰性的,所以上面`JedisPool`在初始化时采用了懒汉式单例进行惰性初始化。 | ||||||
|  |  | ||||||
|  | ### 3.4 启动测试 | ||||||
|  |  | ||||||
|  | 在监听端口输入如下测试数据: | ||||||
| ### 3.3 启动测试 |  | ||||||
|  |  | ||||||
| ```shell | ```shell | ||||||
| [root@hadoop001 ~]#  nc -lk 9999 | [root@hadoop001 ~]#  nc -lk 9999 | ||||||
| @@ -305,7 +324,14 @@ storm storm flink azkaban | |||||||
|  |  | ||||||
| 使用Redis Manager查看写入结果(如下图),可以看到与使用`updateStateByKey`算子得到的计算结果相同。 | 使用Redis Manager查看写入结果(如下图),可以看到与使用`updateStateByKey`算子得到的计算结果相同。 | ||||||
|  |  | ||||||
|    | <div align="center"> <img  src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-streaming-word-count-v3.png"/> </div>   | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  | > 本片文章所有源码见本仓库:[spark-streaming-basis](https://github.com/heibaiying/BigData-Notes/tree/master/code/spark/spark-streaming-basis) | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  | ## 参考资料 | ||||||
|  |  | ||||||
|  | 1. http://spark.apache.org/docs/latest/streaming-programming-guide.html | ||||||
		Reference in New Issue
	
	Block a user