This commit is contained in:
罗祥
2019-05-27 16:51:24 +08:00
parent 5407561d44
commit ec5c799eb1
7 changed files with 203 additions and 11 deletions

View File

@ -16,7 +16,7 @@ object NetworkWordCount {
val ssc = new StreamingContext(sparkConf, Seconds(5))
/*创建文本输入流,并进行词频统计*/
val lines = ssc.socketTextStream("192.168.200.229", 9999)
val lines = ssc.socketTextStream("hadoop001", 9999)
lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
/*启动服务*/

View File

@ -15,11 +15,11 @@ object NetworkWordCountToRedis {
def main(args: Array[String]) {
/*指定时间间隔为5s*/
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val sparkConf = new SparkConf().setAppName("NetworkWordCountToRedis").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
/*创建文本输入流,并进行词频统计*/
val lines = ssc.socketTextStream("192.168.200.229", 9999)
val lines = ssc.socketTextStream("hadoop001", 9999)
val pairs: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
pairs.foreachRDD { rdd =>

View File

@ -18,20 +18,16 @@ object NetworkWordCountV2 {
System.setProperty("HADOOP_USER_NAME", "root")
/*指定时间间隔为5s*/
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val sparkConf = new SparkConf().setAppName("NetworkWordCountV2").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
/*必须要设置检查点*/
ssc.checkpoint("hdfs://192.168.200.229:8020/spark-streaming")
ssc.checkpoint("hdfs://hadoop001:8020/spark-streaming")
/*创建文本输入流,并进行词频统计*/
val lines = ssc.socketTextStream("192.168.200.229", 9999)
val lines = ssc.socketTextStream("hadoop001", 9999)
lines.flatMap(_.split(" ")).map(x => (x, 1))
.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
val currentCount: Int = values.sum
val lastCount: Int = state.getOrElse(0)
Some(currentCount + lastCount)
})
.updateStateByKey[Int](updateFunction _)
.print()
/*启动服务*/
@ -40,4 +36,18 @@ object NetworkWordCountV2 {
ssc.awaitTermination()
}
/**
* 累计求和
*
* @param currentValues 当前的数据
* @param preValues 之前的数据
* @return 相加后的数据
*/
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val current = currentValues.sum
val pre = preValues.getOrElse(0)
Some(current + pre)
}
}