spark straming basis
This commit is contained in:
		@@ -1,18 +0,0 @@
 | 
			
		||||
package com.heibaiying
 | 
			
		||||
 | 
			
		||||
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
 | 
			
		||||
 | 
			
		||||
object JedisPoolUtils {
 | 
			
		||||
 | 
			
		||||
  /*创建Jedis连接池*/
 | 
			
		||||
  val config = new JedisPoolConfig
 | 
			
		||||
  config.setMaxTotal(30)
 | 
			
		||||
  config.setMaxIdle(10)
 | 
			
		||||
  val jedisPool = new JedisPool(config, "localhost", 6379)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  def getConnection: Jedis = {
 | 
			
		||||
    jedisPool.getResource
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -4,23 +4,23 @@ import org.apache.spark.SparkConf
 | 
			
		||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
  *
 | 
			
		||||
  * 词频统计
 | 
			
		||||
  */
 | 
			
		||||
object NetworkWordCount {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  def main(args: Array[String]) {
 | 
			
		||||
 | 
			
		||||
    /*指定时间间隔为5s*/
 | 
			
		||||
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
 | 
			
		||||
    /*指定时间间隔*/
 | 
			
		||||
    val ssc = new StreamingContext(sparkConf, Seconds(1))
 | 
			
		||||
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 | 
			
		||||
 | 
			
		||||
    val lines = ssc.socketTextStream("hadoop001", 9999)
 | 
			
		||||
    /*创建文本输入流,并进行词频统计*/
 | 
			
		||||
    val lines = ssc.socketTextStream("192.168.200.229", 9999)
 | 
			
		||||
    lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
 | 
			
		||||
 | 
			
		||||
    /*启动服务*/
 | 
			
		||||
    ssc.start()
 | 
			
		||||
 | 
			
		||||
    /*等待服务结束*/
 | 
			
		||||
    ssc.awaitTermination()
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,43 +1,45 @@
 | 
			
		||||
package com.heibaiying
 | 
			
		||||
 | 
			
		||||
import com.heibaiying.utils.JedisPoolUtil
 | 
			
		||||
import org.apache.spark.SparkConf
 | 
			
		||||
import org.apache.spark.streaming.dstream.DStream
 | 
			
		||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
 | 
			
		||||
import redis.clients.jedis.Jedis
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
  * 词频统计
 | 
			
		||||
  */
 | 
			
		||||
object NetworkWordCountV2 {
 | 
			
		||||
object NetworkWordCountToRedis {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  def main(args: Array[String]) {
 | 
			
		||||
 | 
			
		||||
    /*
 | 
			
		||||
     * 本地测试时最好指定hadoop用户名,否则会默认使用本地电脑的用户名,
 | 
			
		||||
     * 此时在HDFS上创建目录时可能会抛出权限不足的异常
 | 
			
		||||
     */
 | 
			
		||||
    System.setProperty("HADOOP_USER_NAME", "root")
 | 
			
		||||
 | 
			
		||||
    /*指定时间间隔为5s*/
 | 
			
		||||
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
 | 
			
		||||
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 | 
			
		||||
 | 
			
		||||
    /*必须要设置检查点*/
 | 
			
		||||
    ssc.checkpoint("hdfs://192.168.200.229:8020/spark-streaming")
 | 
			
		||||
 | 
			
		||||
    /*创建文本输入流,并进行词频统计*/
 | 
			
		||||
    val lines = ssc.socketTextStream("192.168.200.229", 9999)
 | 
			
		||||
    lines.flatMap(_.split(" ")).map(x => (x, 1))
 | 
			
		||||
      .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
 | 
			
		||||
        val currentCount: Int = values.sum
 | 
			
		||||
        val lastCount: Int = state.getOrElse(0)
 | 
			
		||||
        Some(currentCount + lastCount)
 | 
			
		||||
      })
 | 
			
		||||
      .print()
 | 
			
		||||
    val pairs: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
 | 
			
		||||
 | 
			
		||||
    pairs.foreachRDD { rdd =>
 | 
			
		||||
      rdd.foreachPartition { partitionOfRecords =>
 | 
			
		||||
        var jedis: Jedis = null
 | 
			
		||||
        try {
 | 
			
		||||
          jedis = JedisPoolUtil.getConnection
 | 
			
		||||
          partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2))
 | 
			
		||||
        } catch {
 | 
			
		||||
          case ex: Exception =>
 | 
			
		||||
            ex.printStackTrace()
 | 
			
		||||
        } finally {
 | 
			
		||||
          if (jedis != null) jedis.close()
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /*启动服务*/
 | 
			
		||||
    ssc.start()
 | 
			
		||||
    /*等待服务结束*/
 | 
			
		||||
    ssc.awaitTermination()
 | 
			
		||||
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -4,20 +4,35 @@ import org.apache.spark.SparkConf
 | 
			
		||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
  * 词频统计
 | 
			
		||||
  * 词频统计升级版
 | 
			
		||||
  */
 | 
			
		||||
object NetworkWordCountV2 {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  def main(args: Array[String]) {
 | 
			
		||||
 | 
			
		||||
    /*
 | 
			
		||||
     * 本地测试时最好指定hadoop用户名,否则会默认使用本地电脑的用户名,
 | 
			
		||||
     * 此时在HDFS上创建目录时可能会抛出权限不足的异常
 | 
			
		||||
     */
 | 
			
		||||
    System.setProperty("HADOOP_USER_NAME", "root")
 | 
			
		||||
 | 
			
		||||
    /*指定时间间隔为5s*/
 | 
			
		||||
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
 | 
			
		||||
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 | 
			
		||||
 | 
			
		||||
    /*必须要设置检查点*/
 | 
			
		||||
    ssc.checkpoint("hdfs://192.168.200.229:8020/spark-streaming")
 | 
			
		||||
 | 
			
		||||
    /*创建文本输入流,并进行词频统计*/
 | 
			
		||||
    val lines = ssc.socketTextStream("192.168.200.229", 9999)
 | 
			
		||||
    lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
 | 
			
		||||
    lines.flatMap(_.split(" ")).map(x => (x, 1))
 | 
			
		||||
      .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
 | 
			
		||||
        val currentCount: Int = values.sum
 | 
			
		||||
        val lastCount: Int = state.getOrElse(0)
 | 
			
		||||
        Some(currentCount + lastCount)
 | 
			
		||||
      })
 | 
			
		||||
      .print()
 | 
			
		||||
 | 
			
		||||
    /*启动服务*/
 | 
			
		||||
    ssc.start()
 | 
			
		||||
 
 | 
			
		||||
@@ -1,30 +1,30 @@
 | 
			
		||||
package com.heibaiying.utils;
 | 
			
		||||
 | 
			
		||||
import redis.clients.jedis.Jedis;
 | 
			
		||||
import redis.clients.jedis.JedisPool;
 | 
			
		||||
import redis.clients.jedis.JedisPoolConfig;
 | 
			
		||||
 | 
			
		||||
public class JedisPoolUtil {
 | 
			
		||||
 | 
			
		||||
    // 必须要声明为 volatile 防止指令重排序
 | 
			
		||||
    private static volatile JedisPool JedisPool = null;
 | 
			
		||||
    /* 声明为volatile防止指令重排序 */
 | 
			
		||||
    private static volatile JedisPool jedisPool = null;
 | 
			
		||||
 | 
			
		||||
    private JedisPoolUtil() {
 | 
			
		||||
        if (JedisPool != null) {
 | 
			
		||||
            throw new RuntimeException("单例模式禁止反射调用!");
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    private static final String HOST = "localhost";
 | 
			
		||||
    private static final int PORT = 6379;
 | 
			
		||||
 | 
			
		||||
    public static JedisPool getConnect() {
 | 
			
		||||
        if (JedisPool == null) {
 | 
			
		||||
 | 
			
		||||
    /* 双重检查锁实现懒汉式单例 */
 | 
			
		||||
    public static Jedis getConnection() {
 | 
			
		||||
        if (jedisPool == null) {
 | 
			
		||||
            synchronized (JedisPoolUtil.class) {
 | 
			
		||||
                if (JedisPool != null) {
 | 
			
		||||
                if (jedisPool == null) {
 | 
			
		||||
                    JedisPoolConfig config = new JedisPoolConfig();
 | 
			
		||||
                    config.setMaxTotal(30);
 | 
			
		||||
                    config.setMaxIdle(10);
 | 
			
		||||
                    JedisPool jedisPool = new JedisPool(config, "localhost", 6379);
 | 
			
		||||
                    jedisPool = new JedisPool(config, HOST, PORT);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return JedisPool;
 | 
			
		||||
        return jedisPool.getResource();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,18 +0,0 @@
 | 
			
		||||
package com.heibaiying.utils
 | 
			
		||||
 | 
			
		||||
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
 | 
			
		||||
 | 
			
		||||
object JedisPoolUtil {
 | 
			
		||||
 | 
			
		||||
  /*创建Jedis连接池*/
 | 
			
		||||
  val config = new JedisPoolConfig
 | 
			
		||||
  config.setMaxTotal(30)
 | 
			
		||||
  config.setMaxIdle(10)
 | 
			
		||||
  val jedisPool = new JedisPool(config, "localhost", 6379)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  def getConnection: Jedis = {
 | 
			
		||||
    jedisPool.getResource
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user