# Spark Straming 整合 Flume ## 一、简介 Apache Flume是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中。Spark Straming提供了以下两种方式用于Flume的整合。 ## 二、推送式方法 在推送式方法(Flume-style Push-based Approach)中,Spark Streaming程序需要对某台服务器的某个端口进行监听,Flume通过`avro Sink`将数据源源不断推送到该端口。 这里以日志文件内容为例,将不断新增的日志文件内容推送到Streaming程序中,具体整合方式如下: ### 2.1 配置日志收集Flume 新建配置`netcat-memory-avro.properties`,使用`tail`命令监听文件内容变化,然后将新的文件内容通过`avro sink`发送到hadoop001这台服务器的8888端口: ```properties #指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /tmp/log.txt a1.sources.s1.shell = /bin/bash -c a1.sources.s1.channels = c1 #配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop001 a1.sinks.k1.port = 8888 a1.sinks.k1.batch-size = 1 a1.sinks.k1.channel = c1 #配置channel类型 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 ``` ### 2.2 项目依赖 项目采用Maven工程进行构建,主要依赖为`spark-streaming`和`spark-streaming-flume`。 ```xml 2.11 2.4.0 org.apache.spark spark-streaming_${scala.version} ${spark.version} org.apache.spark spark-streaming-flume_${scala.version} 2.4.3 ``` ### 2.3 Spark Streaming接收日志数据 调用 FlumeUtils工具类的`createStream`方法,对hadoop001的8888端口进行监听,获取到流数据并进行打印: ```scala import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.flume.FlumeUtils object PushBasedWordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() val ssc = new StreamingContext(sparkConf, Seconds(5)) // 1.获取输入流 val flumeStream = FlumeUtils.createStream(ssc, "hadoop001", 8888) // 2.打印输入流的数据 flumeStream.map(line => new String(line.event.getBody.array()).trim).print() ssc.start() ssc.awaitTermination() } } ``` ### 2.4 项目打包 因为Spark安装目录下是不含有`spark-streaming-flume`依赖包的,所以在提交到集群运行时候必须提供该依赖包,你可以在提交命令中使用`--jar`指定上传到服务器的该依赖包,或者使用`--packages org.apache.spark:spark-streaming-flume_2.12:2.4.3`指定依赖包的完整名称,这样程序在启动时会先去中央仓库进行下载,这要求你的生产环境必须网络畅通。 这里我采用的是第三种方式:使用`maven-shade-plugin`插件进行`ALL IN ONE`打包,把所有依赖的Jar一并打入最终包中。需要注意的是`spark-streaming`包在Spark安装目录的`jars`目录中已经提供,所以不需要打入。插件配置如下: > 关于大数据应用常用打包方式单独整理至:[大数据应用常用打包方式](https://github.com/heibaiying/BigData-Notes/blob/master/notes/大数据应用常用打包方式.md) > > 本项目完整源码见:[spark-streaming-flume](https://github.com/heibaiying/BigData-Notes/tree/master/code/spark/spark-streaming-flume) ```xml org.apache.maven.plugins maven-compiler-plugin 8 8 org.apache.maven.plugins maven-shade-plugin true *:* META-INF/*.SF META-INF/*.sf META-INF/*.DSA META-INF/*.dsa META-INF/*.RSA META-INF/*.rsa META-INF/*.EC META-INF/*.ec META-INF/MSFTSIG.SF META-INF/MSFTSIG.RSA org.apache.spark:spark-streaming_${scala.version} org.scala-lang:scala-library org.apache.commons:commons-lang3 package shade org.scala-tools maven-scala-plugin 2.15.1 scala-compile compile **/*.scala scala-test-compile testCompile ``` 使用`mvn clean package`命令打包后会生产以下两个Jar包,提交`非original`开头的Jar即可。
### 2.5 启动服务和提交作业 启动Flume服务: ```shell flume-ng agent \ --conf conf \ --conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \ --name a1 -Dflume.root.logger=INFO,console ``` 提交Spark Streaming作业: ```shell spark-submit \ --class com.heibaiying.flume.PushBasedWordCount \ --master local[4] \ /usr/appjar/spark-streaming-flume-1.0.jar ``` ### 2.6 测试 这里使用`echo`命令模拟日志产生的场景,往日志文件中追加数据,然后查看程序的输出:
Spark Streaming程序成功接收到数据并打印输出:
### 2.7 注意事项 #### 1. 启动顺序 这里需要注意的,不论你先启动Spark程序还是Flume程序,由于两者的启动都需要一定的时间,此时先启动的程序会短暂地抛出端口拒绝连接的异常,此时不需要进行任何操作,等待两个程序都启动完成即可。
#### 2. 版本一致 最好保证用于本地开发和编译的Scala版本和Spark的Scala版本一致,至少保证大版本一致,如都是`2.11`。
## 三、拉取式方法 拉取式方法(Pull-based Approach using a Custom Sink)是将数据推送到SparkSink接收器中,此时数据会保持缓冲状态,Spark Streaming定时从接收器中拉取数据。这种方式是基于事务的,即只有在Spark Streaming接收和复制数据完成后,才会删除缓冲的数据。与第一种方式相比,具有更强的可靠性和容错保证。整合步骤如下: ### 3.1 配置日志收集Flume 新建Flume配置文件`netcat-memory-sparkSink.properties`,配置和上面基本一致,只是把`a1.sinks.k1.type`的属性修改为`org.apache.spark.streaming.flume.sink.SparkSink`,即采用Spark接收器。 ```properties #指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /tmp/log.txt a1.sources.s1.shell = /bin/bash -c a1.sources.s1.channels = c1 #配置sink a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.hostname = hadoop001 a1.sinks.k1.port = 8888 a1.sinks.k1.batch-size = 1 a1.sinks.k1.channel = c1 #配置channel类型 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 ``` ### 2.2 新增依赖 使用拉取式方法需要额外添加以下两个依赖: ```xml org.scala-lang scala-library 2.12.8 org.apache.commons commons-lang3 3.5 ``` 注意:添加这两个依赖只是为了本地开发测试,Spark的安装目录下已经提供了这两个依赖,所以在最终打包时需要进行排除。 ### 2.3 Spark Streaming接收日志数据 这里和上面推送式方法的代码基本相同,只是将调用方法改为`createPollingStream`。 ```scala import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.flume.FlumeUtils object PullBasedWordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() val ssc = new StreamingContext(sparkConf, Seconds(5)) // 1.获取输入流 val flumeStream = FlumeUtils.createPollingStream(ssc, "hadoop001", 8888) // 2.打印输入流中的数据 flumeStream.map(line => new String(line.event.getBody.array()).trim).print() ssc.start() ssc.awaitTermination() } } ``` ### 2.4 启动测试 启动和提交作业流程与上面相同,这里给出执行脚本,过程不再赘述。 启动Flume进行日志收集: ```shell flume-ng agent \ --conf conf \ --conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-sparkSink.properties \ --name a1 -Dflume.root.logger=INFO,console ``` 提交Spark Streaming作业: ```shel spark-submit \ --class com.heibaiying.flume.PullBasedWordCount \ --master local[4] \ /usr/appjar/spark-streaming-flume-1.0.jar ``` ## 参考资料 1. [streaming-flume-integration](https://spark.apache.org/docs/latest/streaming-flume-integration.html)