From 79228ff9ef6bceb3bf8332b8139b0a43fb058fd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E7=A5=A5?= <1366971433@qq.com> Date: Mon, 27 May 2019 13:43:37 +0800 Subject: [PATCH] spark straming basis --- code/spark/spark-streaming-basis/pom.xml | 12 ++++++ .../java/com/heibaiying/JedisPoolUtils.scala | 18 ++++++++ .../com/heibaiying/NetworkWordCount.scala | 28 ++++++++++++ .../heibaiying/NetworkWordCountToRedis.scala | 43 +++++++++++++++++++ .../com/heibaiying/NetworkWordCountV2.scala | 28 ++++++++++++ .../com/heibaiying/utils/JedisPoolUtil.java | 30 +++++++++++++ .../com/heibaiying/utils/JedisPoolUtil.scala | 18 ++++++++ 7 files changed, 177 insertions(+) create mode 100644 code/spark/spark-streaming-basis/pom.xml create mode 100644 code/spark/spark-streaming-basis/src/main/java/com/heibaiying/JedisPoolUtils.scala create mode 100644 code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCount.scala create mode 100644 code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountToRedis.scala create mode 100644 code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountV2.scala create mode 100644 code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.java create mode 100644 code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.scala diff --git a/code/spark/spark-streaming-basis/pom.xml b/code/spark/spark-streaming-basis/pom.xml new file mode 100644 index 0000000..19d03fd --- /dev/null +++ b/code/spark/spark-streaming-basis/pom.xml @@ -0,0 +1,12 @@ + + + 4.0.0 + + com.heibaiying + spark-streaming-basis + 1.0 + + + \ No newline at end of file diff --git a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/JedisPoolUtils.scala b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/JedisPoolUtils.scala new file mode 100644 index 0000000..3745454 --- /dev/null +++ b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/JedisPoolUtils.scala @@ -0,0 +1,18 @@ +package com.heibaiying + +import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} + +object JedisPoolUtils { + + /*创建Jedis连接池*/ + val config = new JedisPoolConfig + config.setMaxTotal(30) + config.setMaxIdle(10) + val jedisPool = new JedisPool(config, "localhost", 6379) + + + def getConnection: Jedis = { + jedisPool.getResource + } + +} diff --git a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCount.scala b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCount.scala new file mode 100644 index 0000000..297ab1d --- /dev/null +++ b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCount.scala @@ -0,0 +1,28 @@ +package com.heibaiying + +import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Seconds, StreamingContext} + +/** + * + */ +object NetworkWordCount { + + + def main(args: Array[String]) { + + val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]") + /*指定时间间隔*/ + val ssc = new StreamingContext(sparkConf, Seconds(1)) + + val lines = ssc.socketTextStream("hadoop001", 9999) + lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print() + + /*启动服务*/ + ssc.start() + + /*等待服务结束*/ + ssc.awaitTermination() + + } +} diff --git a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountToRedis.scala b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountToRedis.scala new file mode 100644 index 0000000..3e1546d --- /dev/null +++ b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountToRedis.scala @@ -0,0 +1,43 @@ +package com.heibaiying + +import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Seconds, StreamingContext} + +/** + * 词频统计 + */ +object NetworkWordCountV2 { + + + def main(args: Array[String]) { + + /* + * 本地测试时最好指定hadoop用户名,否则会默认使用本地电脑的用户名, + * 此时在HDFS上创建目录时可能会抛出权限不足的异常 + */ + System.setProperty("HADOOP_USER_NAME", "root") + + /*指定时间间隔为5s*/ + val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]") + val ssc = new StreamingContext(sparkConf, Seconds(5)) + + /*必须要设置检查点*/ + ssc.checkpoint("hdfs://192.168.200.229:8020/spark-streaming") + + /*创建文本输入流,并进行词频统计*/ + val lines = ssc.socketTextStream("192.168.200.229", 9999) + lines.flatMap(_.split(" ")).map(x => (x, 1)) + .updateStateByKey((values: Seq[Int], state: Option[Int]) => { + val currentCount: Int = values.sum + val lastCount: Int = state.getOrElse(0) + Some(currentCount + lastCount) + }) + .print() + + /*启动服务*/ + ssc.start() + /*等待服务结束*/ + ssc.awaitTermination() + + } +} diff --git a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountV2.scala b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountV2.scala new file mode 100644 index 0000000..ce2e8fb --- /dev/null +++ b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/NetworkWordCountV2.scala @@ -0,0 +1,28 @@ +package com.heibaiying + +import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Seconds, StreamingContext} + +/** + * 词频统计 + */ +object NetworkWordCountV2 { + + + def main(args: Array[String]) { + + /*指定时间间隔为5s*/ + val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]") + val ssc = new StreamingContext(sparkConf, Seconds(5)) + + /*创建文本输入流,并进行词频统计*/ + val lines = ssc.socketTextStream("192.168.200.229", 9999) + lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print() + + /*启动服务*/ + ssc.start() + /*等待服务结束*/ + ssc.awaitTermination() + + } +} diff --git a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.java b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.java new file mode 100644 index 0000000..150a7e6 --- /dev/null +++ b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.java @@ -0,0 +1,30 @@ +package com.heibaiying.utils; + +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; + +public class JedisPoolUtil { + + // 必须要声明为 volatile 防止指令重排序 + private static volatile JedisPool JedisPool = null; + + private JedisPoolUtil() { + if (JedisPool != null) { + throw new RuntimeException("单例模式禁止反射调用!"); + } + } + + public static JedisPool getConnect() { + if (JedisPool == null) { + synchronized (JedisPoolUtil.class) { + if (JedisPool != null) { + JedisPoolConfig config = new JedisPoolConfig(); + config.setMaxTotal(30); + config.setMaxIdle(10); + JedisPool jedisPool = new JedisPool(config, "localhost", 6379); + } + } + } + return JedisPool; + } +} diff --git a/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.scala b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.scala new file mode 100644 index 0000000..6775019 --- /dev/null +++ b/code/spark/spark-streaming-basis/src/main/java/com/heibaiying/utils/JedisPoolUtil.scala @@ -0,0 +1,18 @@ +package com.heibaiying.utils + +import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} + +object JedisPoolUtil { + + /*创建Jedis连接池*/ + val config = new JedisPoolConfig + config.setMaxTotal(30) + config.setMaxIdle(10) + val jedisPool = new JedisPool(config, "localhost", 6379) + + + def getConnection: Jedis = { + jedisPool.getResource + } + +}