streaming整合flume

This commit is contained in:
luoxiang 2019-06-04 21:54:12 +08:00
parent ae0d2664f5
commit 03f9607720

View File

@ -24,9 +24,7 @@ Apache Flume是一个分布式高可用的数据收集系统可以从不
## 二、推送式方法
在推送式方法(Flume-style Push-based Approach)中Spark Streaming程序需要对某台服务器的某个端口进行监听Flume通过`avro Sink`将数据源源不断推送到该端口。
这里以日志文件内容为例将不断新增的日志文件内容推送到Streaming程序中具体整合方式如下
在推送式方法(Flume-style Push-based Approach)中Spark Streaming程序需要对某台服务器的某个端口进行监听Flume通过`avro Sink`将数据源源不断推送到该端口。这里以监听日志文件为例,具体整合方式如下:
### 2.1 配置日志收集Flume
@ -111,13 +109,10 @@ object PushBasedWordCount {
### 2.4 项目打包
因为Spark安装目录下是不含有`spark-streaming-flume`依赖包的,所以在提交到集群运行时候必须提供该依赖包,你可以在提交命令中使用`--jar`指定上传到服务器的该依赖包,或者使用`--packages org.apache.spark:spark-streaming-flume_2.12:2.4.3`指定依赖包的完整名称,这样程序在启动时会先去中央仓库进行下载,这要求你的生产环境必须网络畅通
因为Spark安装目录下是不含有`spark-streaming-flume`依赖包的,所以在提交到集群运行时候必须提供该依赖包,你可以在提交命令中使用`--jar`指定上传到服务器的该依赖包,或者使用`--packages org.apache.spark:spark-streaming-flume_2.12:2.4.3`指定依赖包的完整名称,这样程序在启动时会先去中央仓库进行下载。
这里我采用的是第三种方式:使用`maven-shade-plugin`插件进行`ALL IN ONE`打包把所有依赖的Jar一并打入最终包中。需要注意的是`spark-streaming`包在Spark安装目录的`jars`目录中已经提供,所以不需要打入。插件配置如下:
> 关于大数据应用常用打包方式单独整理至:[大数据应用常用打包方式](https://github.com/heibaiying/BigData-Notes/blob/master/notes/大数据应用常用打包方式.md)
>
> 本项目完整源码见:[spark-streaming-flume](https://github.com/heibaiying/BigData-Notes/tree/master/code/spark/spark-streaming-flume)
```xml
<build>
@ -207,6 +202,7 @@ object PushBasedWordCount {
</plugins>
</build>
```
> 本项目完整源码见:[spark-streaming-flume](https://github.com/heibaiying/BigData-Notes/tree/master/code/spark/spark-streaming-flume)
使用`mvn clean package`命令打包后会生产以下两个Jar包提交`非original`开头的Jar即可。
@ -258,7 +254,7 @@ Spark Streaming程序成功接收到数据并打印输出
## 三、拉取式方法
拉取式方法(Pull-based Approach using a Custom Sink)是将数据推送到SparkSink接收器中此时数据会保持缓冲状态Spark Streaming定时从接收器中拉取数据。这种方式是基于事务的即只有在Spark Streaming接收和复制数据完成后才会删除缓的数据。与第一种方式相比,具有更强的可靠性和容错保证。整合步骤如下:
拉取式方法(Pull-based Approach using a Custom Sink)是将数据推送到`SparkSink`接收器中此时数据会保持缓冲状态Spark Streaming定时从接收器中拉取数据。这种方式是基于事务的即只有在Spark Streaming接收和复制数据完成后才会删除缓的数据。与第一种方式相比,具有更强的可靠性和容错保证。整合步骤如下:
### 3.1 配置日志收集Flume
@ -306,7 +302,7 @@ a1.channels.c1.transactionCapacity = 100
</dependency>
```
注意:添加这两个依赖只是为了本地开发测试Spark的安装目录下已经提供了这两个依赖所以在最终打包时需要进行排除。
注意添加这两个依赖只是为了本地测试Spark的安装目录下已经提供了这两个依赖所以在最终打包时需要进行排除。
### 2.3 Spark Streaming接收日志数据
@ -359,4 +355,5 @@ spark-submit \
## 参考资料
1. [streaming-flume-integration](https://spark.apache.org/docs/latest/streaming-flume-integration.html)
- [streaming-flume-integration](https://spark.apache.org/docs/latest/streaming-flume-integration.html)
- 关于大数据应用常用的打包方式可以参见:[大数据应用常用打包方式](https://github.com/heibaiying/BigData-Notes/blob/master/notes/大数据应用常用打包方式.md)