This commit is contained in:
luoxiang 2019-05-22 00:18:50 +08:00
parent 38ae95ae97
commit 45846d9573
3 changed files with 187 additions and 0 deletions

View File

@ -0,0 +1,129 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.heibaiying</groupId>
<artifactId>spark-streaming-flume</artifactId>
<version>1.0</version>
<properties>
<scala.version>2.11</scala.version>
<spark.version>2.4.0</spark.version>
</properties>
<dependencies>
<!-- Spark Streaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark Streaming整合Flume依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_${scala.version}</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<!--使用shade进行打包-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.sf</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.dsa</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.rsa</exclude>
<exclude>META-INF/*.EC</exclude>
<exclude>META-INF/*.ec</exclude>
<exclude>META-INF/MSFTSIG.SF</exclude>
<exclude>META-INF/MSFTSIG.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>org.apache.spark:spark-streaming_${scala.version}</exclude>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.apache.commons:commons-lang3</exclude>
</excludes>
</artifactSet>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.1</version>
<executions>
<execution>
<id>scala-compile</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<includes>
<include>**/*.scala</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,29 @@
package com.heibaiying.flume
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.flume.FlumeUtils
/**
* @author : heibaiying
* 使用自定义接收器的基于拉的方法获取数据
*/
object PullBasedWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 1.获取输入流
val flumeStream = FlumeUtils.createPollingStream(ssc, "hadoop001", 8888)
// 2.词频统计
flumeStream.map(line => new String(line.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
}
}

View File

@ -0,0 +1,29 @@
package com.heibaiying.flume
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.flume.FlumeUtils
/**
* @author : heibaiying
* 基于推的方法获取数据
*/
object PushBasedWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 1.获取输入流
val flumeStream = FlumeUtils.createStream(ssc, "hadoop001", 8888)
// 2.词频统计
flumeStream.map(line => new String(line.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
}
}