spark straming basis
This commit is contained in:
		@@ -0,0 +1,18 @@
 | 
			
		||||
package com.heibaiying
 | 
			
		||||
 | 
			
		||||
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
 | 
			
		||||
 | 
			
		||||
object JedisPoolUtils {
 | 
			
		||||
 | 
			
		||||
  /*创建Jedis连接池*/
 | 
			
		||||
  val config = new JedisPoolConfig
 | 
			
		||||
  config.setMaxTotal(30)
 | 
			
		||||
  config.setMaxIdle(10)
 | 
			
		||||
  val jedisPool = new JedisPool(config, "localhost", 6379)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  def getConnection: Jedis = {
 | 
			
		||||
    jedisPool.getResource
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,28 @@
 | 
			
		||||
package com.heibaiying
 | 
			
		||||
 | 
			
		||||
import org.apache.spark.SparkConf
 | 
			
		||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
  *
 | 
			
		||||
  */
 | 
			
		||||
object NetworkWordCount {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  def main(args: Array[String]) {
 | 
			
		||||
 | 
			
		||||
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
 | 
			
		||||
    /*指定时间间隔*/
 | 
			
		||||
    val ssc = new StreamingContext(sparkConf, Seconds(1))
 | 
			
		||||
 | 
			
		||||
    val lines = ssc.socketTextStream("hadoop001", 9999)
 | 
			
		||||
    lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
 | 
			
		||||
 | 
			
		||||
    /*启动服务*/
 | 
			
		||||
    ssc.start()
 | 
			
		||||
 | 
			
		||||
    /*等待服务结束*/
 | 
			
		||||
    ssc.awaitTermination()
 | 
			
		||||
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,43 @@
 | 
			
		||||
package com.heibaiying
 | 
			
		||||
 | 
			
		||||
import org.apache.spark.SparkConf
 | 
			
		||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
  * 词频统计
 | 
			
		||||
  */
 | 
			
		||||
object NetworkWordCountV2 {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  def main(args: Array[String]) {
 | 
			
		||||
 | 
			
		||||
    /*
 | 
			
		||||
     * 本地测试时最好指定hadoop用户名,否则会默认使用本地电脑的用户名,
 | 
			
		||||
     * 此时在HDFS上创建目录时可能会抛出权限不足的异常
 | 
			
		||||
     */
 | 
			
		||||
    System.setProperty("HADOOP_USER_NAME", "root")
 | 
			
		||||
 | 
			
		||||
    /*指定时间间隔为5s*/
 | 
			
		||||
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
 | 
			
		||||
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 | 
			
		||||
 | 
			
		||||
    /*必须要设置检查点*/
 | 
			
		||||
    ssc.checkpoint("hdfs://192.168.200.229:8020/spark-streaming")
 | 
			
		||||
 | 
			
		||||
    /*创建文本输入流,并进行词频统计*/
 | 
			
		||||
    val lines = ssc.socketTextStream("192.168.200.229", 9999)
 | 
			
		||||
    lines.flatMap(_.split(" ")).map(x => (x, 1))
 | 
			
		||||
      .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
 | 
			
		||||
        val currentCount: Int = values.sum
 | 
			
		||||
        val lastCount: Int = state.getOrElse(0)
 | 
			
		||||
        Some(currentCount + lastCount)
 | 
			
		||||
      })
 | 
			
		||||
      .print()
 | 
			
		||||
 | 
			
		||||
    /*启动服务*/
 | 
			
		||||
    ssc.start()
 | 
			
		||||
    /*等待服务结束*/
 | 
			
		||||
    ssc.awaitTermination()
 | 
			
		||||
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,28 @@
 | 
			
		||||
package com.heibaiying
 | 
			
		||||
 | 
			
		||||
import org.apache.spark.SparkConf
 | 
			
		||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
  * 词频统计
 | 
			
		||||
  */
 | 
			
		||||
object NetworkWordCountV2 {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  def main(args: Array[String]) {
 | 
			
		||||
 | 
			
		||||
    /*指定时间间隔为5s*/
 | 
			
		||||
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
 | 
			
		||||
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 | 
			
		||||
 | 
			
		||||
    /*创建文本输入流,并进行词频统计*/
 | 
			
		||||
    val lines = ssc.socketTextStream("192.168.200.229", 9999)
 | 
			
		||||
    lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
 | 
			
		||||
 | 
			
		||||
    /*启动服务*/
 | 
			
		||||
    ssc.start()
 | 
			
		||||
    /*等待服务结束*/
 | 
			
		||||
    ssc.awaitTermination()
 | 
			
		||||
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,30 @@
 | 
			
		||||
package com.heibaiying.utils;
 | 
			
		||||
 | 
			
		||||
import redis.clients.jedis.JedisPool;
 | 
			
		||||
import redis.clients.jedis.JedisPoolConfig;
 | 
			
		||||
 | 
			
		||||
public class JedisPoolUtil {
 | 
			
		||||
 | 
			
		||||
    // 必须要声明为 volatile 防止指令重排序
 | 
			
		||||
    private static volatile JedisPool JedisPool = null;
 | 
			
		||||
 | 
			
		||||
    private JedisPoolUtil() {
 | 
			
		||||
        if (JedisPool != null) {
 | 
			
		||||
            throw new RuntimeException("单例模式禁止反射调用!");
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static JedisPool getConnect() {
 | 
			
		||||
        if (JedisPool == null) {
 | 
			
		||||
            synchronized (JedisPoolUtil.class) {
 | 
			
		||||
                if (JedisPool != null) {
 | 
			
		||||
                    JedisPoolConfig config = new JedisPoolConfig();
 | 
			
		||||
                    config.setMaxTotal(30);
 | 
			
		||||
                    config.setMaxIdle(10);
 | 
			
		||||
                    JedisPool jedisPool = new JedisPool(config, "localhost", 6379);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return JedisPool;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,18 @@
 | 
			
		||||
package com.heibaiying.utils
 | 
			
		||||
 | 
			
		||||
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
 | 
			
		||||
 | 
			
		||||
object JedisPoolUtil {
 | 
			
		||||
 | 
			
		||||
  /*创建Jedis连接池*/
 | 
			
		||||
  val config = new JedisPoolConfig
 | 
			
		||||
  config.setMaxTotal(30)
 | 
			
		||||
  config.setMaxIdle(10)
 | 
			
		||||
  val jedisPool = new JedisPool(config, "localhost", 6379)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  def getConnection: Jedis = {
 | 
			
		||||
    jedisPool.getResource
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user