From 5407561d44b4decf1714fc48adfa8e748f6b0f2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E7=A5=A5?= <1366971433@qq.com> Date: Mon, 27 May 2019 13:43:49 +0800 Subject: [PATCH] spark straming basis --- code/spark/spark-streaming-basis/pom.xml | 12 ++++++ .../java/com/heibaiying/JedisPoolUtils.scala | 18 --------- .../com/heibaiying/NetworkWordCount.scala | 10 ++--- .../heibaiying/NetworkWordCountToRedis.scala | 38 ++++++++++--------- .../com/heibaiying/NetworkWordCountV2.scala | 19 +++++++++- .../com/heibaiying/utils/JedisPoolUtil.java | 24 ++++++------ .../com/heibaiying/utils/JedisPoolUtil.scala | 18 --------- 7 files changed, 66 insertions(+), 73 deletions(-) delete mode 100644 code/spark/spark-streaming-basis/src/main/java/com/heibaiying/JedisPoolUtils.scala delete mode 100644 code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.scala diff --git a/code/spark/spark-streaming-basis/pom.xml b/code/spark/spark-streaming-basis/pom.xml index 19d03fd..759f030 100644 --- a/code/spark/spark-streaming-basis/pom.xml +++ b/code/spark/spark-streaming-basis/pom.xml @@ -8,5 +8,17 @@ spark-streaming-basis 1.0 + + + org.apache.spark + spark-streaming_2.12 + 2.4.3 + + + redis.clients + jedis + 2.9.0 + + \ No newline at end of file diff --git a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/JedisPoolUtils.scala b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/JedisPoolUtils.scala deleted file mode 100644 index 3745454..0000000 --- a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/JedisPoolUtils.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.heibaiying - -import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} - -object JedisPoolUtils { - - /*创建Jedis连接池*/ - val config = new JedisPoolConfig - config.setMaxTotal(30) - config.setMaxIdle(10) - val jedisPool = new JedisPool(config, "localhost", 6379) - - - def getConnection: Jedis = { - jedisPool.getResource - } - -} diff --git a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCount.scala b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCount.scala index 297ab1d..0dba281 100644 --- a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCount.scala +++ b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCount.scala @@ -4,23 +4,23 @@ import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** - * + * 词频统计 */ object NetworkWordCount { def main(args: Array[String]) { + /*指定时间间隔为5s*/ val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]") - /*指定时间间隔*/ - val ssc = new StreamingContext(sparkConf, Seconds(1)) + val ssc = new StreamingContext(sparkConf, Seconds(5)) - val lines = ssc.socketTextStream("hadoop001", 9999) + /*创建文本输入流,并进行词频统计*/ + val lines = ssc.socketTextStream("192.168.200.229", 9999) lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print() /*启动服务*/ ssc.start() - /*等待服务结束*/ ssc.awaitTermination() diff --git a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountToRedis.scala b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountToRedis.scala index 3e1546d..8454cb8 100644 --- a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountToRedis.scala +++ b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountToRedis.scala @@ -1,43 +1,45 @@ package com.heibaiying +import com.heibaiying.utils.JedisPoolUtil import org.apache.spark.SparkConf +import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} +import redis.clients.jedis.Jedis /** * 词频统计 */ -object NetworkWordCountV2 { +object NetworkWordCountToRedis { def main(args: Array[String]) { - /* - * 本地测试时最好指定hadoop用户名,否则会默认使用本地电脑的用户名, - * 此时在HDFS上创建目录时可能会抛出权限不足的异常 - */ - System.setProperty("HADOOP_USER_NAME", "root") - /*指定时间间隔为5s*/ val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) - /*必须要设置检查点*/ - ssc.checkpoint("hdfs://192.168.200.229:8020/spark-streaming") - /*创建文本输入流,并进行词频统计*/ val lines = ssc.socketTextStream("192.168.200.229", 9999) - lines.flatMap(_.split(" ")).map(x => (x, 1)) - .updateStateByKey((values: Seq[Int], state: Option[Int]) => { - val currentCount: Int = values.sum - val lastCount: Int = state.getOrElse(0) - Some(currentCount + lastCount) - }) - .print() + val pairs: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _) + + pairs.foreachRDD { rdd => + rdd.foreachPartition { partitionOfRecords => + var jedis: Jedis = null + try { + jedis = JedisPoolUtil.getConnection + partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2)) + } catch { + case ex: Exception => + ex.printStackTrace() + } finally { + if (jedis != null) jedis.close() + } + } + } /*启动服务*/ ssc.start() /*等待服务结束*/ ssc.awaitTermination() - } } diff --git a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountV2.scala b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountV2.scala index ce2e8fb..1adf684 100644 --- a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountV2.scala +++ b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountV2.scala @@ -4,20 +4,35 @@ import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** - * 词频统计 + * 词频统计升级版 */ object NetworkWordCountV2 { def main(args: Array[String]) { + /* + * 本地测试时最好指定hadoop用户名,否则会默认使用本地电脑的用户名, + * 此时在HDFS上创建目录时可能会抛出权限不足的异常 + */ + System.setProperty("HADOOP_USER_NAME", "root") + /*指定时间间隔为5s*/ val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) + /*必须要设置检查点*/ + ssc.checkpoint("hdfs://192.168.200.229:8020/spark-streaming") + /*创建文本输入流,并进行词频统计*/ val lines = ssc.socketTextStream("192.168.200.229", 9999) - lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print() + lines.flatMap(_.split(" ")).map(x => (x, 1)) + .updateStateByKey((values: Seq[Int], state: Option[Int]) => { + val currentCount: Int = values.sum + val lastCount: Int = state.getOrElse(0) + Some(currentCount + lastCount) + }) + .print() /*启动服务*/ ssc.start() diff --git a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.java b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.java index 150a7e6..1948a3b 100644 --- a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.java +++ b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.java @@ -1,30 +1,30 @@ package com.heibaiying.utils; +import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class JedisPoolUtil { - // 必须要声明为 volatile 防止指令重排序 - private static volatile JedisPool JedisPool = null; + /* 声明为volatile防止指令重排序 */ + private static volatile JedisPool jedisPool = null; - private JedisPoolUtil() { - if (JedisPool != null) { - throw new RuntimeException("单例模式禁止反射调用!"); - } - } + private static final String HOST = "localhost"; + private static final int PORT = 6379; - public static JedisPool getConnect() { - if (JedisPool == null) { + + /* 双重检查锁实现懒汉式单例 */ + public static Jedis getConnection() { + if (jedisPool == null) { synchronized (JedisPoolUtil.class) { - if (JedisPool != null) { + if (jedisPool == null) { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(30); config.setMaxIdle(10); - JedisPool jedisPool = new JedisPool(config, "localhost", 6379); + jedisPool = new JedisPool(config, HOST, PORT); } } } - return JedisPool; + return jedisPool.getResource(); } } diff --git a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.scala b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.scala deleted file mode 100644 index 6775019..0000000 --- a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.heibaiying.utils - -import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} - -object JedisPoolUtil { - - /*创建Jedis连接池*/ - val config = new JedisPoolConfig - config.setMaxTotal(30) - config.setMaxIdle(10) - val jedisPool = new JedisPool(config, "localhost", 6379) - - - def getConnection: Jedis = { - jedisPool.getResource - } - -}