This commit is contained in:
luoxiang 2019-05-27 22:00:57 +08:00
parent 057fd2c589
commit f891a9ae74

View File

@ -64,6 +64,8 @@ storm storm flink azkaban
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-streaming-word-count-v1.png"/> </div>
<br/>
下面针对示例代码进行讲解:
### 3.1 StreamingContext
@ -79,7 +81,7 @@ Spark Streaming编程的入口类是StreamingContext在创建时候需要指
+ 基本数据源包括文件系统、socket连接等
+ 高级数据源包括KafkaFlumeKinesis等。
在基本数据源中Spark支持对HDFS上指定目录进行监听,当有新文件加入时,会获取其文件内容作为输入流。创建方式如下:
在基本数据源中Spark支持监听HDFS上指定目录,当有新文件加入时,会获取其文件内容作为输入流。创建方式如下:
```scala
// 对于文本文件,指明监听目录即可
@ -88,13 +90,13 @@ streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
```
指定的目录时,可以是具体的目录,如`hdfs://namenode:8040/logs/`;也可以使用通配符,如`hdfs://namenode:8040/logs/2017/*`。
被监听的目录可以是具体目录,如`hdfs://host:8040/logs/`;也可以使用通配符,如`hdfs://host:8040/logs/2017/*`。
> 关于高级数据源的整合单独整理至:[Spark Streaming 整合 Flume](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Flume.md) 和 [Spark Streaming 整合 Kafka](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Kafka.md)
### 3.3 服务的启动与停止
在示例代码中,使用`streamingContext.start()`代表启动服务,此时还要使`streamingContext.awaitTermination()`使服务处于等待和可用的状态,直到发生异常或者手动使用`streamingContext.stop()`进行终止。
在示例代码中,使用`streamingContext.start()`代表启动服务,此时还要使`streamingContext.awaitTermination()`使服务处于等待和可用的状态,直到发生异常或者手动使用`streamingContext.stop()`进行终止。
@ -108,7 +110,7 @@ DStream是Spark Streaming提供的基本抽象。它表示连续的数据流。
### 2.2 updateStateByKey
除了能够支持RDD的大多数transformation算子外DStream还有部分独有的算子这当中比较重要的是`updateStateByKey`。文章开头的词频统计程序,只能统计每一次输入文本中单词出现的数量,想要统计所有历史输入中单词出现的数量,可以使用`updateStateByKey`算子。代码如下:
除了能够支持RDD的算子外DStream还有部分独有的transformation算子这当中比较常用的是`updateStateByKey`。文章开头的词频统计程序,只能统计每一次输入文本中单词出现的数量,想要统计所有历史输入中单词出现的数量,可以使用`updateStateByKey`算子。代码如下:
```scala
object NetworkWordCountV2 {
@ -150,7 +152,7 @@ object NetworkWordCountV2 {
}
```
使用`updateStateByKey`算子,你必须使用` ssc.checkpoint()`设置检查点,这样当使用`updateStateByKey`算子时,它会去检查点中取出上一次保存的信息,并使用自定义的`updateFunction`函数将上一次的数据和本次数据进行相加,然后返回。
使用`updateStateByKey`算子,你必须使用`ssc.checkpoint()`设置检查点,这样当使用`updateStateByKey`算子时,它会去检查点中取出上一次保存的信息,并使用自定义的`updateFunction`函数将上一次的数据和本次数据进行相加,然后返回。
### 2.3 启动测试
@ -190,8 +192,8 @@ Spark Streaming支持以下输出操作
| :------------------------------------------ | :----------------------------------------------------------- |
| **print**() | 在运行流应用程序的driver节点上打印DStream中每个批次的前十个元素。用于开发调试。 |
| **saveAsTextFiles**(*prefix*, [*suffix*]) | 将DStream的内容保存为文本文件。每个批处理间隔的文件名基于前缀和后缀生成“prefix-TIME_IN_MS [.suffix]”。 |
| **saveAsObjectFiles**(*prefix*, [*suffix*]) | 将DStream的内容序列化为Java对象并保存到SequenceFiles。每个批处理间隔的文件名基于前缀和后缀生成“prefix-TIME_IN_MS [.suffix]”。 |
| **saveAsHadoopFiles**(*prefix*, [*suffix*]) | 将DStream的内容保存为Hadoop文件。每个批处理间隔的文件名基于前缀和后缀生成“prefix-TIME_IN_MS [.suffix]”。 |
| **saveAsObjectFiles**(*prefix*, [*suffix*]) | 将DStream的内容序列化为Java对象并保存到SequenceFiles。每个批处理间隔的文件名基于前缀和后缀生成“prefix-TIME_IN_MS [.suffix]”。 |
| **saveAsHadoopFiles**(*prefix*, [*suffix*]) | 将DStream的内容保存为Hadoop文件。每个批处理间隔的文件名基于前缀和后缀生成“prefix-TIME_IN_MS [.suffix]”。 |
| **foreachRDD**(*func*) | 最通用的输出方式它将函数func应用于从流生成的每个RDD。此函数应将每个RDD中的数据推送到外部系统例如将RDD保存到文件或通过网络将其写入数据库。 |
前面的四个API都是直接调用即可下面主要讲解通用的输出方式`foreachRDD(func)`通过该API你可以将数据保存到任何你需要的数据源。
@ -225,8 +227,7 @@ object NetworkWordCountToRedis {
/*创建文本输入流,并进行词频统计*/
val lines = ssc.socketTextStream("hadoop001", 9999)
val pairs: DStream[(String, Int)] = lines.flatMap(_.split(" "))
.map(x => (x, 1)).reduceByKey(_ + _)
val pairs: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
/*保存数据到Redis*/
pairs.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
@ -326,7 +327,7 @@ storm storm flink azkaban
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-streaming-word-count-v3.png"/> </div>
<br/>
> 本片文章所有源码见本仓库:[spark-streaming-basis](https://github.com/heibaiying/BigData-Notes/tree/master/code/spark/spark-streaming-basis)