From f891a9ae74ee06b185e32ec995a2b3a6ab5ae760 Mon Sep 17 00:00:00 2001 From: luoxiang <2806718453@qq.com> Date: Mon, 27 May 2019 22:00:57 +0800 Subject: [PATCH] modify --- notes/Spark_Streaming基本操作.md | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/notes/Spark_Streaming基本操作.md b/notes/Spark_Streaming基本操作.md index efa0e69..a96e17c 100644 --- a/notes/Spark_Streaming基本操作.md +++ b/notes/Spark_Streaming基本操作.md @@ -64,6 +64,8 @@ storm storm flink azkaban
+
+ 下面针对示例代码进行讲解: ### 3.1 StreamingContext @@ -79,7 +81,7 @@ Spark Streaming编程的入口类是StreamingContext,在创建时候需要指 + 基本数据源:包括文件系统、socket连接等; + 高级数据源:包括Kafka,Flume,Kinesis等。 -在基本数据源中,Spark支持对HDFS上指定目录进行监听,当有新文件加入时,会获取其文件内容作为输入流。创建方式如下: +在基本数据源中,Spark支持监听HDFS上指定目录,当有新文件加入时,会获取其文件内容作为输入流。创建方式如下: ```scala // 对于文本文件,指明监听目录即可 @@ -88,13 +90,13 @@ streamingContext.textFileStream(dataDirectory) streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) ``` -指定的目录时,可以是具体的目录,如`hdfs://namenode:8040/logs/`;也可以使用通配符,如`hdfs://namenode:8040/logs/2017/*`。 +被监听的目录可以是具体目录,如`hdfs://host:8040/logs/`;也可以使用通配符,如`hdfs://host:8040/logs/2017/*`。 > 关于高级数据源的整合单独整理至:[Spark Streaming 整合 Flume](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Flume.md) 和 [Spark Streaming 整合 Kafka](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Kafka.md) ### 3.3 服务的启动与停止 -在示例代码中,使用`streamingContext.start()`代表启动服务,此时还要使`streamingContext.awaitTermination()`使服务处于等待和可用的状态,直到发生异常或者手动使用`streamingContext.stop()`进行终止。 +在示例代码中,使用`streamingContext.start()`代表启动服务,此时还要使用`streamingContext.awaitTermination()`使服务处于等待和可用的状态,直到发生异常或者手动使用`streamingContext.stop()`进行终止。 @@ -108,7 +110,7 @@ DStream是Spark Streaming提供的基本抽象。它表示连续的数据流。 ### 2.2 updateStateByKey -除了能够支持RDD的大多数transformation算子外,DStream还有部分独有的算子,这当中比较重要的是`updateStateByKey`。文章开头的词频统计程序,只能统计每一次输入文本中单词出现的数量,想要统计所有历史输入中单词出现的数量,可以使用`updateStateByKey`算子。代码如下: +除了能够支持RDD的算子外,DStream还有部分独有的transformation算子,这当中比较常用的是`updateStateByKey`。文章开头的词频统计程序,只能统计每一次输入文本中单词出现的数量,想要统计所有历史输入中单词出现的数量,可以使用`updateStateByKey`算子。代码如下: ```scala object NetworkWordCountV2 { @@ -150,7 +152,7 @@ object NetworkWordCountV2 { } ``` -使用`updateStateByKey`算子,你必须使用` ssc.checkpoint()`设置检查点,这样当使用`updateStateByKey`算子时,它会去检查点中取出上一次保存的信息,并使用自定义的`updateFunction`函数将上一次的数据和本次数据进行相加,然后返回。 +使用`updateStateByKey`算子,你必须使用`ssc.checkpoint()`设置检查点,这样当使用`updateStateByKey`算子时,它会去检查点中取出上一次保存的信息,并使用自定义的`updateFunction`函数将上一次的数据和本次数据进行相加,然后返回。 ### 2.3 启动测试 @@ -190,8 +192,8 @@ Spark Streaming支持以下输出操作: | :------------------------------------------ | :----------------------------------------------------------- | | **print**() | 在运行流应用程序的driver节点上打印DStream中每个批次的前十个元素。用于开发调试。 | | **saveAsTextFiles**(*prefix*, [*suffix*]) | 将DStream的内容保存为文本文件。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS [.suffix]”。 | -| **saveAsObjectFiles**(*prefix*, [*suffix*]) | 将此DStream的内容序列化为Java对象,并保存到SequenceFiles。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS [.suffix]”。 | -| **saveAsHadoopFiles**(*prefix*, [*suffix*]) | 将此DStream的内容保存为Hadoop文件。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS [.suffix]”。 | +| **saveAsObjectFiles**(*prefix*, [*suffix*]) | 将DStream的内容序列化为Java对象,并保存到SequenceFiles。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS [.suffix]”。 | +| **saveAsHadoopFiles**(*prefix*, [*suffix*]) | 将DStream的内容保存为Hadoop文件。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS [.suffix]”。 | | **foreachRDD**(*func*) | 最通用的输出方式,它将函数func应用于从流生成的每个RDD。此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库。 | 前面的四个API都是直接调用即可,下面主要讲解通用的输出方式`foreachRDD(func)`,通过该API你可以将数据保存到任何你需要的数据源。 @@ -225,8 +227,7 @@ object NetworkWordCountToRedis { /*创建文本输入流,并进行词频统计*/ val lines = ssc.socketTextStream("hadoop001", 9999) - val pairs: DStream[(String, Int)] = lines.flatMap(_.split(" ")) - .map(x => (x, 1)).reduceByKey(_ + _) + val pairs: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _) /*保存数据到Redis*/ pairs.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => @@ -326,7 +327,7 @@ storm storm flink azkaban
- +
> 本片文章所有源码见本仓库:[spark-streaming-basis](https://github.com/heibaiying/BigData-Notes/tree/master/code/spark/spark-streaming-basis)