diff --git a/README.md b/README.md index 24eb97d..078484b 100644 --- a/README.md +++ b/README.md @@ -79,10 +79,9 @@ TODO ## 六、Flume -1. Flume简介 +1. [Flume简介及基本使用](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Flume简介及基本使用.md) 2. [Linux环境下Flume的安装部署](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Linux%E4%B8%8BFlume%E7%9A%84%E5%AE%89%E8%A3%85.md) -3. Flume的基本使用 -4. Flume整合Kafka +3. [Flume整合Kafka](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Flume整合Kafka.md) ## 七、Sqoop diff --git a/notes/Flume整合Kafka.md b/notes/Flume整合Kafka.md index 0371d21..68d97b9 100644 --- a/notes/Flume整合Kafka.md +++ b/notes/Flume整合Kafka.md @@ -1,81 +1,115 @@ -# Flume 整合 Kafka +# Flume 整合 Kafka + -## 一、背景 - -先说一下,为什么要使用 Flume + Kafka? - -这里举一个实时流处理的项目为例,由于采集的日志数据可能存在峰值和峰谷,比如如果是一个电商项目,那么峰值就会出现在秒杀时,这时如果直接将Flume聚合后的数据输入到Storm或者Spark Streaming 中进行处理,集群处理压力就会过大,这时采用Kafka就可以起到削峰的作用。Kafka天生就是为大数据场景而设计,具有高吞吐等特性,能很好的抗住峰值数据的冲击。 - -## ![flume-kafka](D:\BigData-Notes\pictures\flume-kafka.png)二、整合流程 - -#### 1. 启动Zookeeper和Kafka - -这里我启动一个单节点的Kafka作为测试 - -```shell -# 启动Zookeeper -zkServer.sh start - -# 启动kafka -bin/kafka-server-start.sh config/server.properties -``` - -#### 2. 创建主题 - -创建一个主题`flume-kafka`,之后flume收集到的数据都会发到这个主题上 - -```shell -# 创建主题 -bin/kafka-topics.sh --create --zookeeper hadoop001:2181 --replication-factor 1 --partitions 1 --topic flume-kafka - -# 查看创建的主题 -bin/kafka-topics.sh --zookeeper hadoop001:2181 --list -``` - - - -#### 3. 启动kafka消费者 - -启动一个消费者,监听我们刚才创建的`flume-kafka`主题 - -```shell -# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flume-kafka -``` - - - -#### 4. 配置Flume - -新建配置文件`exec-memory-kafka.properties`,文件内容如下。这里我们监听一个名为kafka.log的文件,当文件内容有变化时,将新增加的内容发送到kafka上。 - -```properties -a1.sources = s1 a1.channels = c1 -a1.sinks = k1 -a1.sources.s1.type=exec a1.sources.s1.command=tail -F /tmp/kafka.log a1.sources.s1.channels=c1 -#设置Kafka接收器 a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #设置Kafka地址 a1.sinks.k1.brokerList=hadoop001:9092 #设置发送到Kafka上的主题 a1.sinks.k1.topic=flume-kafka #设置序列化方式 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.sinks.k1.channel=c1 - -a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100 -``` - - - -#### 5. 启动Flume - -```shell -flume-ng agent \ ---conf conf \ ---conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-kafka.properties \ ---name a1 -Dflume.root.logger=INFO,console -``` - - - -#### 6. 测试 - -向监听的`/tmp/kafka.log `文件中追加内容,查看kafka消费者的输出 - -![flume-kafka-01](D:\BigData-Notes\pictures\flume-kafka-01.png) - -可以看到`flume-kafka`主题的消费端已经收到了对应的消息 - -![flume-kafka-2](D:\BigData-Notes\pictures\flume-kafka-2.png) \ No newline at end of file +## 一、背景 + +先说一下,为什么要使用 Flume + Kafka? + +这里举一个实时流处理的项目为例,由于采集的日志数据可能存在峰值和峰谷,比如如果是一个电商项目,那么峰值就会出现在秒杀时,这时如果直接将Flume聚合后的数据输入到Storm或者Spark Streaming 中进行处理,集群处理压力就会过大,这时采用Kafka就可以起到削峰的作用。Kafka天生就是为大数据场景而设计,具有高吞吐等特性,能很好的抗住峰值数据的冲击。 + +
+ + + +## 二、整合流程 + +Flume发送数据到Kafka上主要是通过`KafkaSink`来实现的,主要步骤如下: + +#### 1. 启动Zookeeper和Kafka + +这里可以只启动一个单节点的Kafka作为测试 + +```shell +# 启动Zookeeper +zkServer.sh start + +# 启动kafka +bin/kafka-server-start.sh config/server.properties +``` + +#### 2. 创建主题 + +创建一个主题`flume-kafka`,之后Flume收集到的数据都会发到这个主题上 + +```shell +# 创建主题 +bin/kafka-topics.sh --create \ +--zookeeper hadoop001:2181 \ +--replication-factor 1 \ +--partitions 1 --topic flume-kafka + +# 查看创建的主题 +bin/kafka-topics.sh --zookeeper hadoop001:2181 --list +``` + + + +#### 3. 启动kafka消费者 + +启动一个消费者,监听我们刚才创建的`flume-kafka`主题 + +```shell +# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flume-kafka +``` + + + +#### 4. 配置Flume + +新建配置文件`exec-memory-kafka.properties`,文件内容如下。这里我们监听一个名为`kafka.log`的文件,当文件内容有变化时,将新增加的内容发送到Kafka的`flume-kafka`主题中。 + +```properties +a1.sources = s1 +a1.channels = c1 +a1.sinks = k1 + +a1.sources.s1.type=exec +a1.sources.s1.command=tail -F /tmp/kafka.log +a1.sources.s1.channels=c1 + +#设置Kafka接收器 +a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink +#设置Kafka地址 +a1.sinks.k1.brokerList=hadoop001:9092 +#设置发送到Kafka上的主题 +a1.sinks.k1.topic=flume-kafka +#设置序列化方式 +a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder +a1.sinks.k1.channel=c1 + +a1.channels.c1.type=memory +a1.channels.c1.capacity=10000 +a1.channels.c1.transactionCapacity=100 +``` + + + +#### 5. 启动Flume + +```shell +flume-ng agent \ +--conf conf \ +--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-kafka.properties \ +--name a1 -Dflume.root.logger=INFO,console +``` + + + +#### 6. 测试 + +向监听的`/tmp/kafka.log `文件中追加内容,查看Kafka消费者的输出 + +
+ +可以看到`flume-kafka`主题的消费端已经收到了对应的消息 + +
\ No newline at end of file diff --git a/notes/Flume简介及基本使用.md b/notes/Flume简介及基本使用.md index befcee4..6ca10b9 100644 --- a/notes/Flume简介及基本使用.md +++ b/notes/Flume简介及基本使用.md @@ -1,5 +1,18 @@ # Flume 简介及基本使用 + + + ## 一、Flume简介 Apache Flume是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到存储系统中。在大数据场景中我们通常使用Flume来进行日志数据收集。 @@ -12,7 +25,7 @@ Flume 分为 NG 和 OG (1.0 之前),NG在OG的基础上进行了完全的重 -![flume-architecture](D:\BigData-Notes\pictures\flume-architecture.png) +
### 2.1 基本架构 @@ -60,7 +73,7 @@ Sink的主要功能从Channel中读取Evnet,并将其存入外部存储系统 4. 其实对于Flume的使用,除非有特别的需求,否则通过简单的配置组合Flume内置Source,Sink,Channel就能满足我们大多数的需求,所以对于Flume的基本使用主要是写配置文件为主。 - 在 Flume 的[官网](http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html)上对所有类型的组件的配置参数均以表格的方式做了详尽的介绍,并且都有配置样例。同时不同版本的参数可能略有所不同,所以建议在使用时,选取官网对应版本的User Guide作为主要参考资料。 + 在 [Flume官网](http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html)上对所有类型的组件的配置参数均以表格的方式做了详尽的介绍,并且都有配置样例。同时不同版本的参数可能略有所不同,所以建议在使用时,选取官网对应版本的User Guide作为主要参考资料。 @@ -72,13 +85,13 @@ Flume 支持多种架构模式,分别介绍如下 -![flume-multi-agent-flow](D:\BigData-Notes\pictures\flume-multi-agent-flow.png) +
-Flume支持跨越多个Agent的数据传递,这要求前一个Agent的Sink和下一个Agent的source都必须是`Avro`类型,Sink指向Source所在主机名(或IP地址)和端口。 +Flume支持跨越多个Agent的数据传递,这要求前一个Agent的Sink和下一个Agent的source都必须是`Avro`类型,Sink指向Source所在主机名(或IP地址)和端口(详细配置见下文案例三)。 ### 3.2 Consolidation -![flume-consolidation](D:\BigData-Notes\pictures\flume-consolidation.png) +
@@ -86,19 +99,17 @@ Flume支持跨越多个Agent的数据传递,这要求前一个Agent的Sink和 ### 3.3 Multiplexing the flow -![flume-multiplexing-the-flow](D:\BigData-Notes\pictures\flume-multiplexing-the-flow.png) +
-Flume支持从一个source源向多个channnel,也就是向多个sink传递事件,这个操作称之为`fan out`(扇出)。默认情况下`fan out`是向所有的channel复制`event`,即所有channel收到的数据都是相同的,当然Flume也支持在source上自定义一个复用选择器(multiplexing selecto) 来实现我们自己的路由规则。 +Flume支持从一个source源向多个channnel,也就是向多个sink传递事件,这个操作称之为`fan out`(扇出)。默认情况下`fan out`是向所有的channel复制`event`,即所有channel收到的数据都是相同的,当然Flume也支持在source上自定义一个复用选择器(multiplexing selecto) 来实现我们自己的路由规则。 ## 四、Flume配置格式 -### 4.1 一个Agent定义一个数据流 +Flume配置通常需要以下两个步骤: -在一个Agent定义一个Flow需要以下两个步骤: - -1. 分别定义好Agent的sources,sinks,channels,然后将sources和sinks与通道进行绑定。需要注意的是一个source可以配置多个channel,但一个sink只能配置一个channel。 +1. 分别定义好Agent的sources,sinks,channels,然后将sources和sinks与通道进行绑定。需要注意的是一个source可以配置多个channel,但一个sink只能配置一个channel。基本格式如下: ```shell .sources = @@ -112,9 +123,10 @@ Flume支持从一个source源向多个channnel,也就是向多个sink传递事 .sinks..channel = ``` -2. 分别定义source,sink,channel的具体属性 +2. 分别定义source,sink,channel的具体属性。基本格式如下: ```shell + .sources.. = # properties for channels @@ -124,32 +136,29 @@ Flume支持从一个source源向多个channnel,也就是向多个sink传递事 .sources.. = ``` -### 4.2 一个Agent中定义多个数据流 -一个Agent可以包含多个独立的Flow。您可以在配置中列出多个sources,sinks,channels。 -```shell -# list the sources, sinks and channels for the agent -.sources = -.sinks = -.channels = -``` +## 五、Flume的安装部署 + +为方便大家后期查阅,本仓库中所有软件的安装均单独成篇,Flume的安装见: + +[Linux环境下Flume的安装部署](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Linux%E4%B8%8BFlume%E7%9A%84%E5%AE%89%E8%A3%85.md) -## 五、Flume使用案例 +## 六、Flume使用案例 介绍几个Flume的使用案例: -+ 案例一:使用监听Flume文件内容变动,将新增加的内容输出到控制台 -+ 案例二:监听指定目录,将目录下新增加的文件存储到HDFS -+ 案例三:监听对应的端口,将数据发送到远程服务器,远程服务器接收输出,并输出到控制台 ++ 案例一:使用Flume监听文件内容变动,将新增加的内容输出到控制台 ++ 案例二:使用Flume监听指定目录,将目录下新增加的文件存储到HDFS ++ 案例三:使用avro将本服务器收集到的日志数据发送到另外一台服务器 ### 5.1 案例一 需求: 监听文件内容变动,将新增加的内容输出到控制台 -实现: 使用`Exec Source`配合`tail`命令实现 +实现: 主要使用`Exec Source`配合`tail`命令实现 #### 1. 配置 @@ -193,11 +202,11 @@ flume-ng agent \ 向文件中追加数据 -![flume-example-1](D:\BigData-Notes\pictures\flume-example-1.png) +
追加内容在日志中显示 -![flume-example-2](D:\BigData-Notes\pictures\flume-example-2.png) +
@@ -255,7 +264,7 @@ flume-ng agent \ # cp log.txt logs/ ``` -![flume-example-3](D:\BigData-Notes\pictures\flume-example-3.png) +
查看上传到HDFS上的文件内容与本地是否一致 @@ -263,75 +272,105 @@ flume-ng agent \ # hdfs dfs -cat /flume/events/19-04-09/13/log.txt.1554788567801 ``` -![flume-example-4](D:\BigData-Notes\pictures\flume-example-4.png) +
+### 5.3 案例三 +需求: 使用avro将本服务器收集到的数据发送到另外一台服务器 + +实现:使用`avro sources`和`avro Sink`实现 + +#### 1. 配置日志收集Flume + +新建配置`netcat-memory-avro.properties`,监听文件内容变化,然后将新的文件内容通过`avro sink`发送到hadoop001这台服务器的8888端口 ```properties #指定agent的sources,sinks,channels -a1.sources = s1 -a1.sinks = k1 -a1.channels = c1 - +a1.sources = s1 +a1.sinks = k1 +a1.channels = c1 + #配置sources属性 -a1.sources.s1.type = netcat -a1.sources.s1.bind = hadoop001 -a1.sources.s1.port = 44444 +a1.sources.s1.type = exec +a1.sources.s1.command = tail -F /tmp/log.txt +a1.sources.s1.shell = /bin/bash -c a1.sources.s1.channels = c1 - -#配置sink + +#配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop001 -a1.sinks.k1.port = 4545 -a1.sinks.k1.batch-size = 1 -a1.sinks.k1.channel = c1 - +a1.sinks.k1.port = 8888 +a1.sinks.k1.batch-size = 1 +a1.sinks.k1.channel = c1 + #配置channel类型 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 +``` +#### 2. 配置日志聚合Flume +使用 `avro source`监听hadoop001服务器的8888端口,将获取到内容输出到控制台 - +```properties #指定agent的sources,sinks,channels -a2.sources = s2 -a2.sinks = k2 -a2.channels = c2 - +a2.sources = s2 +a2.sinks = k2 +a2.channels = c2 + #配置sources属性 -a2.sources.r2.type = avro -a2.sources.r2.bind = hadoop001 -a2.sources.r2.port = 6666 +a2.sources.s2.type = avro +a2.sources.s2.bind = hadoop001 +a2.sources.s2.port = 8888 #将sources与channels进行绑定 -a2.sources.s1.channels = c2 - -#配置sink +a2.sources.s2.channels = c2 + +#配置sink a2.sinks.k2.type = logger -#将sinks与channels进行绑定 -a2.sinks.k2.channel = c2 - +#将sinks与channels进行绑定 +a2.sinks.k2.channel = c2 + #配置channel类型 a2.channels.c2.type = memory -a1.channels.c2.capacity = 1000 -a1.channels.c2.transactionCapacity = 100 +a2.channels.c2.capacity = 1000 +a2.channels.c2.transactionCapacity = 100 +``` +#### 3. 启动 +启动日志聚集Flume: - -flume-ng agent \ ---conf conf \ ---conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \ ---name a1 -Dflume.root.logger=INFO,console - - +```shell flume-ng agent \ --conf conf \ --conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/avro-memory-logger.properties \ --name a2 -Dflume.root.logger=INFO,console ``` +在启动日志收集Flume: + +```shell +flume-ng agent \ +--conf conf \ +--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \ +--name a1 -Dflume.root.logger=INFO,console +``` + +这里建议按以上顺序启动,原因是`avro.source`会先建立与端口的绑定,这样`avro sink`连接时才不会报无法连接到端口的异常,但是即使不按顺序启动也是没关系的,flume会一直重试与端口的连接,直到`avro.source`建立好连接。 + +
+ +#### 4.测试 + +向文件`tmp/log.txt`中追加内容 + +
+ +可以看到已经从8888端口监听到内容,并成功输出到控制台 + +
\ No newline at end of file diff --git a/pictures/flume-example-8.png b/pictures/flume-example-8.png new file mode 100644 index 0000000..29c6440 Binary files /dev/null and b/pictures/flume-example-8.png differ diff --git a/pictures/flume-example-9.png b/pictures/flume-example-9.png new file mode 100644 index 0000000..4f7e82b Binary files /dev/null and b/pictures/flume-example-9.png differ diff --git a/pictures/flume-retry.png b/pictures/flume-retry.png new file mode 100644 index 0000000..cf1706e Binary files /dev/null and b/pictures/flume-retry.png differ