From ae0d2664f5072fee80f22a571d6204f2ff7e84b0 Mon Sep 17 00:00:00 2001 From: luoxiang <2806718453@qq.com> Date: Tue, 4 Jun 2019 21:23:54 +0800 Subject: [PATCH] =?UTF-8?q?spark=20streaming=20=E5=9F=BA=E6=9C=AC=E6=93=8D?= =?UTF-8?q?=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- notes/Spark_Streaming基本操作.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/notes/Spark_Streaming基本操作.md b/notes/Spark_Streaming基本操作.md index a96e17c..a9bd647 100644 --- a/notes/Spark_Streaming基本操作.md +++ b/notes/Spark_Streaming基本操作.md @@ -70,16 +70,16 @@ storm storm flink azkaban ### 3.1 StreamingContext -Spark Streaming编程的入口类是StreamingContext,在创建时候需要指明`sparkConf`和`batchDuration`(批次时间),Spark流处理本质是将流数据拆分为一个个批次,然后进行微批处理,batchDuration就是用于指定将流数据拆成批次的时间间隔。这个时间可以根据业务需求和服务器性能进行指定,如果业务要求低延迟并且服务器性能也允许,则这个时间可以指定得很短。 +Spark Streaming编程的入口类是StreamingContext,在创建时候需要指明`sparkConf`和`batchDuration`(批次时间),Spark流处理本质是将流数据拆分为一个个批次,然后进行微批处理,`batchDuration`就是批次拆分的时间间隔。这个时间可以根据业务需求和服务器性能进行指定,如果业务要求低延迟并且服务器性能也允许,则这个时间可以指定得很短。 这里需要注意的是:示例代码使用的是本地模式,配置为`local[2]`,这里不能配置为`local[1]`。这是因为对于流数据的处理,Spark必须有一个独立的Executor来接收数据,然后再由其他的Executors来处理,所以为了保证数据能够被处理,至少要有2个Executors。这里我们的程序只有一个数据流,在并行读取多个数据流的时候,也需要保证有足够的Executors来接收和处理数据。 ### 3.2 数据源 -在示例代码中使用的是`socketTextStream`来创建基于socket的数据流,实际上Spark还支持多种数据源,分为以下两类: +在示例代码中使用的是`socketTextStream`来创建基于Socket的数据流,实际上Spark还支持多种数据源,分为以下两类: -+ 基本数据源:包括文件系统、socket连接等; -+ 高级数据源:包括Kafka,Flume,Kinesis等。 ++ **基本数据源**:包括文件系统、Socket连接等; ++ **高级数据源**:包括Kafka,Flume,Kinesis等。 在基本数据源中,Spark支持监听HDFS上指定目录,当有新文件加入时,会获取其文件内容作为输入流。创建方式如下: @@ -92,7 +92,7 @@ streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirector 被监听的目录可以是具体目录,如`hdfs://host:8040/logs/`;也可以使用通配符,如`hdfs://host:8040/logs/2017/*`。 -> 关于高级数据源的整合单独整理至:[Spark Streaming 整合 Flume](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Flume.md) 和 [Spark Streaming 整合 Kafka](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Kafka.md) +> 关于高级数据源的整合单独整理至:[Spark Streaming整合Flume](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Flume.md) 和 [Spark Streaming整合Kafka](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Kafka.md) ### 3.3 服务的启动与停止 @@ -104,13 +104,13 @@ streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirector ### 2.1 DStream与RDDs -DStream是Spark Streaming提供的基本抽象。它表示连续的数据流。在内部,DStream由一系列连续的RDD表示。所以从本质上而言,应用于DStream的任何操作都会转换为底层RDD上的操作。例如,在示例代码中flatMap算子的操作实际上是作用在每个RDDs上(如下图)。因为这个原因,所以DStream能够支持RDD的大部分transformation算子。 +DStream是Spark Streaming提供的基本抽象。它表示连续的数据流。在内部,DStream由一系列连续的RDD表示。所以从本质上而言,应用于DStream的任何操作都会转换为底层RDD上的操作。例如,在示例代码中flatMap算子的操作实际上是作用在每个RDDs上(如下图)。因为这个原因,所以DStream能够支持RDD大部分的*transformation*算子。
### 2.2 updateStateByKey -除了能够支持RDD的算子外,DStream还有部分独有的transformation算子,这当中比较常用的是`updateStateByKey`。文章开头的词频统计程序,只能统计每一次输入文本中单词出现的数量,想要统计所有历史输入中单词出现的数量,可以使用`updateStateByKey`算子。代码如下: +除了能够支持RDD的算子外,DStream还有部分独有的*transformation*算子,这当中比较常用的是`updateStateByKey`。文章开头的词频统计程序,只能统计每一次输入文本中单词出现的数量,想要统计所有历史输入中单词出现的数量,可以使用`updateStateByKey`算子。代码如下: ```scala object NetworkWordCountV2 { @@ -295,7 +295,7 @@ pairs.foreachRDD { rdd => } ``` -这里可以看到一共使用了三次循环,分别是循环RDD,循环分区,循环每条记录,上面我们的代码是在循环分区的时候获取连接,也就是为每一个分区获取一个连接。但是这里大家可能会有疑问:为什么不在循环RDD的时候,为每一个RDD获取一个连接,这样所需要的连接数可以更少。实际上这是不可行的,如果按照这种情况进行改写,如下: +这里可以看到一共使用了三次循环,分别是循环RDD,循环分区,循环每条记录,上面我们的代码是在循环分区的时候获取连接,也就是为每一个分区获取一个连接。但是这里大家可能会有疑问:为什么不在循环RDD的时候,为每一个RDD获取一个连接,这样所需要的连接数会更少。实际上这是不可行的,如果按照这种情况进行改写,如下: ```scala pairs.foreachRDD { rdd => @@ -307,9 +307,9 @@ pairs.foreachRDD { rdd => } ``` -此时在执行时候就会抛出`Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis`,这是因为在实际计算时,Spark会将对RDD操作分解为多个Task,Task则运行在具体的Worker Node上。在执行之前,Spark会对任务进行闭包,之后闭包被序列化并发送给每个Executor,这同时也是产生累加器和广播变量的原因。正式因为要执行序列化,而`Jedis`显然是不能被序列化的,所以会抛出异常。 +此时在执行时候就会抛出`Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis`,这是因为在实际计算时,Spark会将对RDD操作分解为多个Task,Task运行在具体的Worker Node上。在执行之前,Spark会对任务进行闭包,之后闭包被序列化并发送给每个Executor,而`Jedis`显然是不能被序列化的,所以会抛出异常。 -第二个需要注意的是ConnectionPool最好是一个静态,惰性初始化连接池 。这是因为Spark的转换操作本身就是惰性的,且没有数据流时是不会触发写出操作,故出于性能考虑,连接池应该是惰性的,所以上面`JedisPool`在初始化时采用了懒汉式单例进行惰性初始化。 +第二个需要注意的是ConnectionPool最好是一个静态,惰性初始化连接池 。这是因为Spark的转换操作本身就是惰性的,且没有数据流时不会触发写出操作,所以出于性能考虑,连接池应该是惰性的,因此上面`JedisPool`在初始化时采用了懒汉式单例进行惰性初始化。 ### 3.4 启动测试 @@ -335,4 +335,4 @@ storm storm flink azkaban ## 参考资料 -1. http://spark.apache.org/docs/latest/streaming-programming-guide.html \ No newline at end of file +Spark官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html \ No newline at end of file