spark-streaming-flume
This commit is contained in:
		| @@ -18,9 +18,8 @@ object PullBasedWordCount { | ||||
|     // 1.获取输入流 | ||||
|     val flumeStream = FlumeUtils.createPollingStream(ssc, "hadoop001", 8888) | ||||
|  | ||||
|     // 2.词频统计 | ||||
|     flumeStream.map(line => new String(line.event.getBody.array()).trim) | ||||
|       .flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print() | ||||
|     // 2.打印输入流中的数据 | ||||
|     flumeStream.map(line => new String(line.event.getBody.array()).trim).print() | ||||
|  | ||||
|     ssc.start() | ||||
|     ssc.awaitTermination() | ||||
|   | ||||
| @@ -19,9 +19,8 @@ object PushBasedWordCount { | ||||
|     // 1.获取输入流 | ||||
|     val flumeStream = FlumeUtils.createStream(ssc, "hadoop001", 8888) | ||||
|  | ||||
|     // 2.词频统计 | ||||
|     flumeStream.map(line => new String(line.event.getBody.array()).trim) | ||||
|       .flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print() | ||||
|     // 2.打印输入流的数据 | ||||
|     flumeStream.map(line => new String(line.event.getBody.array()).trim).print() | ||||
|  | ||||
|     ssc.start() | ||||
|     ssc.awaitTermination() | ||||
|   | ||||
		Reference in New Issue
	
	Block a user