Flume 基本使用

This commit is contained in:
luoxiang 2019-04-09 22:59:37 +08:00
parent e47314991d
commit 48c8da36aa
6 changed files with 220 additions and 148 deletions

View File

@ -79,10 +79,9 @@ TODO
## 六、Flume
1. Flume简介
1. [Flume简介及基本使用](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Flume简介及基本使用.md)
2. [Linux环境下Flume的安装部署](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Linux%E4%B8%8BFlume%E7%9A%84%E5%AE%89%E8%A3%85.md)
3. Flume的基本使用
4. Flume整合Kafka
3. [Flume整合Kafka](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Flume整合Kafka.md)
## 七、Sqoop

View File

@ -1,81 +1,115 @@
# Flume 整合 Kafka
# Flume 整合 Kafka
<nav>
<a href="#一背景">一、背景</a><br/>
<a href="#二整合流程">二、整合流程</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#1-启动Zookeeper和Kafka">1. 启动Zookeeper和Kafka</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#2-创建主题">2. 创建主题</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#3-启动kafka消费者">3. 启动kafka消费者</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#4-配置Flume">4. 配置Flume</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#5-启动Flume">5. 启动Flume</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#6-测试">6. 测试</a><br/>
</nav>
## 一、背景
先说一下,为什么要使用 Flume + Kafka
这里举一个实时流处理的项目为例由于采集的日志数据可能存在峰值和峰谷比如如果是一个电商项目那么峰值就会出现在秒杀时这时如果直接将Flume聚合后的数据输入到Storm或者Spark Streaming 中进行处理集群处理压力就会过大这时采用Kafka就可以起到削峰的作用。Kafka天生就是为大数据场景而设计具有高吞吐等特性能很好的抗住峰值数据的冲击。
## ![flume-kafka](D:\BigData-Notes\pictures\flume-kafka.png)二、整合流程
#### 1. 启动Zookeeper和Kafka
这里我启动一个单节点的Kafka作为测试
```shell
# 启动Zookeeper
zkServer.sh start
# 启动kafka
bin/kafka-server-start.sh config/server.properties
```
#### 2. 创建主题
创建一个主题`flume-kafka`之后flume收集到的数据都会发到这个主题上
```shell
# 创建主题
bin/kafka-topics.sh --create --zookeeper hadoop001:2181 --replication-factor 1 --partitions 1 --topic flume-kafka
# 查看创建的主题
bin/kafka-topics.sh --zookeeper hadoop001:2181 --list
```
#### 3. 启动kafka消费者
启动一个消费者,监听我们刚才创建的`flume-kafka`主题
```shell
# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flume-kafka
```
#### 4. 配置Flume
新建配置文件`exec-memory-kafka.properties`文件内容如下。这里我们监听一个名为kafka.log的文件当文件内容有变化时将新增加的内容发送到kafka上。
```properties
a1.sources = s1 a1.channels = c1
a1.sinks = k1
a1.sources.s1.type=exec a1.sources.s1.command=tail -F /tmp/kafka.log a1.sources.s1.channels=c1
#设置Kafka接收器 a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #设置Kafka地址 a1.sinks.k1.brokerList=hadoop001:9092 #设置发送到Kafka上的主题 a1.sinks.k1.topic=flume-kafka #设置序列化方式 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.sinks.k1.channel=c1
a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100
```
#### 5. 启动Flume
```shell
flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-kafka.properties \
--name a1 -Dflume.root.logger=INFO,console
```
#### 6. 测试
向监听的`/tmp/kafka.log `文件中追加内容查看kafka消费者的输出
![flume-kafka-01](D:\BigData-Notes\pictures\flume-kafka-01.png)
可以看到`flume-kafka`主题的消费端已经收到了对应的消息
![flume-kafka-2](D:\BigData-Notes\pictures\flume-kafka-2.png)
## 一、背景
先说一下,为什么要使用 Flume + Kafka
这里举一个实时流处理的项目为例由于采集的日志数据可能存在峰值和峰谷比如如果是一个电商项目那么峰值就会出现在秒杀时这时如果直接将Flume聚合后的数据输入到Storm或者Spark Streaming 中进行处理集群处理压力就会过大这时采用Kafka就可以起到削峰的作用。Kafka天生就是为大数据场景而设计具有高吞吐等特性能很好的抗住峰值数据的冲击。
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flume-kafka.png"/> </div>
## 二、整合流程
Flume发送数据到Kafka上主要是通过`KafkaSink`来实现的,主要步骤如下:
#### 1. 启动Zookeeper和Kafka
这里可以只启动一个单节点的Kafka作为测试
```shell
# 启动Zookeeper
zkServer.sh start
# 启动kafka
bin/kafka-server-start.sh config/server.properties
```
#### 2. 创建主题
创建一个主题`flume-kafka`之后Flume收集到的数据都会发到这个主题上
```shell
# 创建主题
bin/kafka-topics.sh --create \
--zookeeper hadoop001:2181 \
--replication-factor 1 \
--partitions 1 --topic flume-kafka
# 查看创建的主题
bin/kafka-topics.sh --zookeeper hadoop001:2181 --list
```
#### 3. 启动kafka消费者
启动一个消费者,监听我们刚才创建的`flume-kafka`主题
```shell
# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flume-kafka
```
#### 4. 配置Flume
新建配置文件`exec-memory-kafka.properties`,文件内容如下。这里我们监听一个名为`kafka.log`的文件当文件内容有变化时将新增加的内容发送到Kafka的`flume-kafka`主题中。
```properties
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type=exec
a1.sources.s1.command=tail -F /tmp/kafka.log
a1.sources.s1.channels=c1
#设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka地址
a1.sinks.k1.brokerList=hadoop001:9092
#设置发送到Kafka上的主题
a1.sinks.k1.topic=flume-kafka
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.channel=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
```
#### 5. 启动Flume
```shell
flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-kafka.properties \
--name a1 -Dflume.root.logger=INFO,console
```
#### 6. 测试
向监听的`/tmp/kafka.log `文件中追加内容查看Kafka消费者的输出
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flume-kafka-01.png"/> </div>
可以看到`flume-kafka`主题的消费端已经收到了对应的消息
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flume-kafka-2.png"/> </div>

View File

@ -1,5 +1,18 @@
# Flume 简介及基本使用
<nav>
<a href="#一Flume简介">一、Flume简介</a><br/>
<a href="#二Flume架构和基本概念">二、Flume架构和基本概念</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;<a href="#21-基本架构">2.1 基本架构</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;<a href="#22-基本概念">2.2 基本概念</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;<a href="#23-组件种类">2.3 组件种类</a><br/>
<a href="#三Flume架构模式">三、Flume架构模式</a><br/>
<a href="#四Flume配置格式">四、Flume配置格式</a><br/>
<a href="#五Flume的安装部署">五、Flume安装部署</a><br/>
<a href="#六Flume使用案例">六、Flume使用案例</a><br/>
</nav>
## 一、Flume简介
Apache Flume是一个分布式高可用的数据收集系统可以从不同的数据源收集数据经过聚合后发送到存储系统中。在大数据场景中我们通常使用Flume来进行日志数据收集。
@ -12,7 +25,7 @@ Flume 分为 NG 和 OG (1.0 之前)NG在OG的基础上进行了完全的重
![flume-architecture](D:\BigData-Notes\pictures\flume-architecture.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flume-architecture.png"/> </div>
### 2.1 基本架构
@ -60,7 +73,7 @@ Sink的主要功能从Channel中读取Evnet并将其存入外部存储系统
4. 其实对于Flume的使用除非有特别的需求否则通过简单的配置组合Flume内置SourceSinkChannel就能满足我们大多数的需求所以对于Flume的基本使用主要是写配置文件为主。
在 Flume 的[官网](http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html)上对所有类型的组件的配置参数均以表格的方式做了详尽的介绍并且都有配置样例。同时不同版本的参数可能略有所不同所以建议在使用时选取官网对应版本的User Guide作为主要参考资料。
[Flume官网](http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html)上对所有类型的组件的配置参数均以表格的方式做了详尽的介绍并且都有配置样例。同时不同版本的参数可能略有所不同所以建议在使用时选取官网对应版本的User Guide作为主要参考资料。
@ -72,13 +85,13 @@ Flume 支持多种架构模式,分别介绍如下
![flume-multi-agent-flow](D:\BigData-Notes\pictures\flume-multi-agent-flow.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flume-multi-agent-flow.png"/> </div>
Flume支持跨越多个Agent的数据传递这要求前一个Agent的Sink和下一个Agent的source都必须是`Avro`类型Sink指向Source所在主机名或IP地址和端口。
Flume支持跨越多个Agent的数据传递这要求前一个Agent的Sink和下一个Agent的source都必须是`Avro`类型Sink指向Source所在主机名或IP地址和端口(详细配置见下文案例三)
### 3.2 Consolidation
![flume-consolidation](D:\BigData-Notes\pictures\flume-consolidation.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flume-consolidation.png"/> </div>
@ -86,19 +99,17 @@ Flume支持跨越多个Agent的数据传递这要求前一个Agent的Sink和
### 3.3 Multiplexing the flow
![flume-multiplexing-the-flow](D:\BigData-Notes\pictures\flume-multiplexing-the-flow.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flume-multiplexing-the-flow.png"/> </div>
Flume支持从一个source源向多个channnel也就是向多个sink传递事件这个操作称之为`fan out`(扇出)。默认情况下`fan out`是向所有的channel复制`event`,即所有channel收到的数据都是相同的当然Flume也支持在source上自定义一个复用选择器(multiplexing selecto) 来实现我们自己的路由规则。
Flume支持从一个source源向多个channnel也就是向多个sink传递事件这个操作称之为`fan out`(扇出)。默认情况下`fan out`是向所有的channel复制`event`即所有channel收到的数据都是相同的当然Flume也支持在source上自定义一个复用选择器(multiplexing selecto) 来实现我们自己的路由规则。
## 四、Flume配置格式
### 4.1 一个Agent定义一个数据流
Flume配置通常需要以下两个步骤
在一个Agent定义一个Flow需要以下两个步骤
1. 分别定义好Agent的sourcessinkschannels然后将sources和sinks与通道进行绑定。需要注意的是一个source可以配置多个channel但一个sink只能配置一个channel。
1. 分别定义好Agent的sourcessinkschannels然后将sources和sinks与通道进行绑定。需要注意的是一个source可以配置多个channel但一个sink只能配置一个channel。基本格式如下
```shell
<Agent>.sources = <Source>
@ -112,9 +123,10 @@ Flume支持从一个source源向多个channnel也就是向多个sink传递事
<Agent>.sinks.<Sink>.channel = <Channel1>
```
2. 分别定义sourcesinkchannel的具体属性
2. 分别定义sourcesinkchannel的具体属性。基本格式如下:
```shell
<Agent>.sources.<Source>.<someProperty> = <someValue>
# properties for channels
@ -124,32 +136,29 @@ Flume支持从一个source源向多个channnel也就是向多个sink传递事
<Agent>.sources.<Sink>.<someProperty> = <someValue>
```
### 4.2 一个Agent中定义多个数据流
一个Agent可以包含多个独立的Flow。您可以在配置中列出多个sourcessinkschannels。
```shell
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
```
## 五、Flume的安装部署
为方便大家后期查阅本仓库中所有软件的安装均单独成篇Flume的安装见
[Linux环境下Flume的安装部署](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Linux%E4%B8%8BFlume%E7%9A%84%E5%AE%89%E8%A3%85.md)
## 、Flume使用案例
## 、Flume使用案例
介绍几个Flume的使用案例
+ 案例一:使用监听Flume文件内容变动将新增加的内容输出到控制台
+ 案例二监听指定目录将目录下新增加的文件存储到HDFS
+ 案例三:监听对应的端口,将数据发送到远程服务器,远程服务器接收输出,并输出到控制台
+ 案例一使用Flume监听文件内容变动,将新增加的内容输出到控制台
+ 案例二:使用Flume监听指定目录将目录下新增加的文件存储到HDFS
+ 案例三:使用avro将本服务器收集到的日志数据发送到另外一台服务器
### 5.1 案例一
需求: 监听文件内容变动,将新增加的内容输出到控制台
实现: 使用`Exec Source`配合`tail`命令实现
实现: 主要使用`Exec Source`配合`tail`命令实现
#### 1. 配置
@ -193,11 +202,11 @@ flume-ng agent \
向文件中追加数据
![flume-example-1](D:\BigData-Notes\pictures\flume-example-1.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flume-example-1.png"/> </div>
追加内容在日志中显示
![flume-example-2](D:\BigData-Notes\pictures\flume-example-2.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flume-example-2.png"/> </div>
@ -255,7 +264,7 @@ flume-ng agent \
# cp log.txt logs/
```
![flume-example-3](D:\BigData-Notes\pictures\flume-example-3.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flume-example-3.png"/> </div>
查看上传到HDFS上的文件内容与本地是否一致
@ -263,75 +272,105 @@ flume-ng agent \
# hdfs dfs -cat /flume/events/19-04-09/13/log.txt.1554788567801
```
![flume-example-4](D:\BigData-Notes\pictures\flume-example-4.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flume-example-4.png"/> </div>
### 5.3 案例三
需求: 使用avro将本服务器收集到的数据发送到另外一台服务器
实现:使用`avro sources``avro Sink`实现
#### 1. 配置日志收集Flume
新建配置`netcat-memory-avro.properties`,监听文件内容变化,然后将新的文件内容通过`avro sink`发送到hadoop001这台服务器的8888端口
```properties
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources属性
a1.sources.s1.type = netcat
a1.sources.s1.bind = hadoop001
a1.sources.s1.port = 44444
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
a1.sources.s1.channels = c1
#配置sink
#配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 4545
a1.sinks.k1.batch-size = 1
a1.sinks.k1.channel = c1
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
a1.sinks.k1.channel = c1
#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
```
#### 2. 配置日志聚合Flume
使用 `avro source`监听hadoop001服务器的8888端口将获取到内容输出到控制台
```properties
#指定agent的sources,sinks,channels
a2.sources = s2
a2.sinks = k2
a2.channels = c2
a2.sources = s2
a2.sinks = k2
a2.channels = c2
#配置sources属性
a2.sources.r2.type = avro
a2.sources.r2.bind = hadoop001
a2.sources.r2.port = 6666
a2.sources.s2.type = avro
a2.sources.s2.bind = hadoop001
a2.sources.s2.port = 8888
#将sources与channels进行绑定
a2.sources.s1.channels = c2
#配置sink
a2.sources.s2.channels = c2
#配置sink
a2.sinks.k2.type = logger
#将sinks与channels进行绑定
a2.sinks.k2.channel = c2
#将sinks与channels进行绑定
a2.sinks.k2.channel = c2
#配置channel类型
a2.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
```
#### 3. 启动
启动日志聚集Flume
flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \
--name a1 -Dflume.root.logger=INFO,console
```shell
flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/avro-memory-logger.properties \
--name a2 -Dflume.root.logger=INFO,console
```
在启动日志收集Flume:
```shell
flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \
--name a1 -Dflume.root.logger=INFO,console
```
这里建议按以上顺序启动,原因是`avro.source`会先建立与端口的绑定,这样`avro sink`连接时才不会报无法连接到端口的异常但是即使不按顺序启动也是没关系的flume会一直重试与端口的连接直到`avro.source`建立好连接。
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flume-retry.png"/> </div>
#### 4.测试
向文件`tmp/log.txt`中追加内容
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flume-example-8.png"/> </div>
可以看到已经从8888端口监听到内容并成功输出到控制台
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flume-example-9.png"/> </div>

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 43 KiB

BIN
pictures/flume-retry.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 55 KiB