diff --git a/code/spark/spark-streaming-flume/pom.xml b/code/spark/spark-streaming-flume/pom.xml new file mode 100644 index 0000000..c8fe1e8 --- /dev/null +++ b/code/spark/spark-streaming-flume/pom.xml @@ -0,0 +1,129 @@ + + + 4.0.0 + + com.heibaiying + spark-streaming-flume + 1.0 + + + 2.11 + 2.4.0 + + + + + + org.apache.spark + spark-streaming_${scala.version} + ${spark.version} + + + + org.apache.spark + spark-streaming-flume_${scala.version} + 2.4.3 + + + org.scala-lang + scala-library + 2.12.8 + + + org.apache.commons + commons-lang3 + 3.5 + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + org.apache.maven.plugins + maven-shade-plugin + + true + + + *:* + + META-INF/*.SF + META-INF/*.sf + META-INF/*.DSA + META-INF/*.dsa + META-INF/*.RSA + META-INF/*.rsa + META-INF/*.EC + META-INF/*.ec + META-INF/MSFTSIG.SF + META-INF/MSFTSIG.RSA + + + + + + org.apache.spark:spark-streaming_${scala.version} + org.scala-lang:scala-library + org.apache.commons:commons-lang3 + + + + + + package + + shade + + + + + + + + + + + + + org.scala-tools + maven-scala-plugin + 2.15.1 + + + scala-compile + + compile + + + + **/*.scala + + + + + scala-test-compile + + testCompile + + + + + + + + + \ No newline at end of file diff --git a/code/spark/spark-streaming-flume/src/main/scala/com/heibaiying/flume/PullBasedWordCount.scala b/code/spark/spark-streaming-flume/src/main/scala/com/heibaiying/flume/PullBasedWordCount.scala new file mode 100644 index 0000000..e0d1ba5 --- /dev/null +++ b/code/spark/spark-streaming-flume/src/main/scala/com/heibaiying/flume/PullBasedWordCount.scala @@ -0,0 +1,29 @@ +package com.heibaiying.flume + +import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.flume.FlumeUtils + +/** + * @author : heibaiying + * 使用自定义接收器的基于拉的方法获取数据 + */ +object PullBasedWordCount { + + def main(args: Array[String]): Unit = { + + val sparkConf = new SparkConf() + val ssc = new StreamingContext(sparkConf, Seconds(5)) + + // 1.获取输入流 + val flumeStream = FlumeUtils.createPollingStream(ssc, "hadoop001", 8888) + + // 2.词频统计 + flumeStream.map(line => new String(line.event.getBody.array()).trim) + .flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print() + + ssc.start() + ssc.awaitTermination() + } + +} diff --git a/code/spark/spark-streaming-flume/src/main/scala/com/heibaiying/flume/PushBasedWordCount.scala b/code/spark/spark-streaming-flume/src/main/scala/com/heibaiying/flume/PushBasedWordCount.scala new file mode 100644 index 0000000..6f5d6c0 --- /dev/null +++ b/code/spark/spark-streaming-flume/src/main/scala/com/heibaiying/flume/PushBasedWordCount.scala @@ -0,0 +1,29 @@ +package com.heibaiying.flume + +import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.flume.FlumeUtils + + +/** + * @author : heibaiying + * 基于推的方法获取数据 + */ +object PushBasedWordCount { + + def main(args: Array[String]): Unit = { + + val sparkConf = new SparkConf() + val ssc = new StreamingContext(sparkConf, Seconds(5)) + + // 1.获取输入流 + val flumeStream = FlumeUtils.createStream(ssc, "hadoop001", 8888) + + // 2.词频统计 + flumeStream.map(line => new String(line.event.getBody.array()).trim) + .flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print() + + ssc.start() + ssc.awaitTermination() + } +}