This commit is contained in:
罗祥 2019-04-09 18:03:30 +08:00
parent e4dc2b4084
commit e47314991d
17 changed files with 589 additions and 4 deletions

View File

@ -6,7 +6,7 @@
> Java 程序员的大数据学习之路 — — 持续更新中
> Java 程序员的大数据学习之路2019 — — 持续更新中
@ -43,7 +43,7 @@
> 本仓库涉及的所有软件的详细搭建步骤整理至:[Linux下大数据常用软件安装指南](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Linux中大数据常用软件安装指南.md)
> 为方便查阅,所有软件的安装步骤统一整理至:[Linux下大数据常用软件安装指南](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Linux中大数据常用软件安装指南.md)
@ -81,7 +81,7 @@ TODO
1. Flume简介
2. [Linux环境下Flume的安装部署](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Linux%E4%B8%8BFlume%E7%9A%84%E5%AE%89%E8%A3%85.md)
3. Flume的使用
3. Flume的基本使用
4. Flume整合Kafka
## 七、Sqoop

81
notes/Flume整合Kafka.md Normal file
View File

@ -0,0 +1,81 @@
# Flume 整合 Kafka
## 一、背景
先说一下,为什么要使用 Flume + Kafka
这里举一个实时流处理的项目为例由于采集的日志数据可能存在峰值和峰谷比如如果是一个电商项目那么峰值就会出现在秒杀时这时如果直接将Flume聚合后的数据输入到Storm或者Spark Streaming 中进行处理集群处理压力就会过大这时采用Kafka就可以起到削峰的作用。Kafka天生就是为大数据场景而设计具有高吞吐等特性能很好的抗住峰值数据的冲击。
## ![flume-kafka](D:\BigData-Notes\pictures\flume-kafka.png)二、整合流程
#### 1. 启动Zookeeper和Kafka
这里我启动一个单节点的Kafka作为测试
```shell
# 启动Zookeeper
zkServer.sh start
# 启动kafka
bin/kafka-server-start.sh config/server.properties
```
#### 2. 创建主题
创建一个主题`flume-kafka`之后flume收集到的数据都会发到这个主题上
```shell
# 创建主题
bin/kafka-topics.sh --create --zookeeper hadoop001:2181 --replication-factor 1 --partitions 1 --topic flume-kafka
# 查看创建的主题
bin/kafka-topics.sh --zookeeper hadoop001:2181 --list
```
#### 3. 启动kafka消费者
启动一个消费者,监听我们刚才创建的`flume-kafka`主题
```shell
# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flume-kafka
```
#### 4. 配置Flume
新建配置文件`exec-memory-kafka.properties`文件内容如下。这里我们监听一个名为kafka.log的文件当文件内容有变化时将新增加的内容发送到kafka上。
```properties
a1.sources = s1 a1.channels = c1
a1.sinks = k1
a1.sources.s1.type=exec a1.sources.s1.command=tail -F /tmp/kafka.log a1.sources.s1.channels=c1
#设置Kafka接收器 a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #设置Kafka地址 a1.sinks.k1.brokerList=hadoop001:9092 #设置发送到Kafka上的主题 a1.sinks.k1.topic=flume-kafka #设置序列化方式 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.sinks.k1.channel=c1
a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100
```
#### 5. 启动Flume
```shell
flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-kafka.properties \
--name a1 -Dflume.root.logger=INFO,console
```
#### 6. 测试
向监听的`/tmp/kafka.log `文件中追加内容查看kafka消费者的输出
![flume-kafka-01](D:\BigData-Notes\pictures\flume-kafka-01.png)
可以看到`flume-kafka`主题的消费端已经收到了对应的消息
![flume-kafka-2](D:\BigData-Notes\pictures\flume-kafka-2.png)

View File

@ -0,0 +1,337 @@
# Flume 简介及基本使用
## 一、Flume简介
Apache Flume是一个分布式高可用的数据收集系统可以从不同的数据源收集数据经过聚合后发送到存储系统中。在大数据场景中我们通常使用Flume来进行日志数据收集。
版本说明:
Flume 分为 NG 和 OG (1.0 之前)NG在OG的基础上进行了完全的重构是目前使用最为广泛的版本。下面的介绍均以NG版本为基础。
## 二、Flume架构和基本概念
![flume-architecture](D:\BigData-Notes\pictures\flume-architecture.png)
### 2.1 基本架构
上图是flume的基本架构图
外部数据源以特定格式向Flume发送`events` (事件)。当`source`接收到`events`时,它将其存储到一个或多个`channel``channe`会一直保存`events`直到它被`sink`所消费。`sink`的主要功能从`channel`中读取`events`并将其存入外部存储系统中如HDFS或将其转发到下一个Flume的`source`,成功后再从`channel`中移除`events`
### 2.2 基本概念
以下概念需要记住,在之后的配置文件中会用到:
**1. Event**
Evnet是可由Flume NG传输的单一数据单元。类似于JMS和消息传递系统中的消息。一个事件由标题和正文组成前者是键/值映射,后者是任意字节数组。
**2. Source**
数据收集组件从外部数据源收集数据并存储到Channel中。
**3. Channel**
Channel是源和接收器之间事件的管道用于临时存储数据。可以是内存也可以是持久化的文件系统说明如下。
+ Memory Channel : 使用内存,优点是速度快,但是数据可能会丢失。如在突然宕机的情况下,内存中的数据就有丢失的风险;
+ File Channel : 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢。
**4. Sink**
Sink的主要功能从Channel中读取Evnet并将其存入外部存储系统中如HDFS或将其转发到下一个Flume的Source成功后再从channel中移除Event。
**5. Agent**
是一个独立的JVM进程包含组件Source、 Channel、 Sink等组件。
### 2.3 组件种类
1. Flume提供了多达几十种类型的Source比如`Avro Source``Thrift Source``Exec Source``JMS Source`等,使得我们仅仅通过配置类型和参数的方式就能从不同的数据源获取数据;
2. 与Source相比Flume也提供了多种Sink比如`HDFS Sink``Hive Sink``HBaseSinks``Avro Sink`得益于丰富的Sink我们也可以仅通过配置就能将收集到的数据输出到指定存储位置
3. 同样的Flume也支持多种Channel比如`Memory Channel``JDBC Channel``Kafka Channel``File Channel`等。
4. 其实对于Flume的使用除非有特别的需求否则通过简单的配置组合Flume内置SourceSinkChannel就能满足我们大多数的需求所以对于Flume的基本使用主要是写配置文件为主。
在 Flume 的[官网](http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html)上对所有类型的组件的配置参数均以表格的方式做了详尽的介绍并且都有配置样例。同时不同版本的参数可能略有所不同所以建议在使用时选取官网对应版本的User Guide作为主要参考资料。
## 三、Flume架构模式
Flume 支持多种架构模式,分别介绍如下
### 3.1 multi-agent flow
![flume-multi-agent-flow](D:\BigData-Notes\pictures\flume-multi-agent-flow.png)
Flume支持跨越多个Agent的数据传递这要求前一个Agent的Sink和下一个Agent的source都必须是`Avro`类型Sink指向Source所在主机名或IP地址和端口。
### 3.2 Consolidation
![flume-consolidation](D:\BigData-Notes\pictures\flume-consolidation.png)
日志收集中常常存在大量的客户端比如分布式web服务Flume 支持在使用多个Agent分别收集日志然后通过一个或者多个Agent聚合后再存储到文件系统中。
### 3.3 Multiplexing the flow
![flume-multiplexing-the-flow](D:\BigData-Notes\pictures\flume-multiplexing-the-flow.png)
Flume支持从一个source源向多个channnel也就是向多个sink传递事件这个操作称之为`fan out`(扇出)。默认情况下`fan out`是向所有的channel复制`event`,即所有channel收到的数据都是相同的当然Flume也支持在source上自定义一个复用选择器(multiplexing selecto) 来实现我们自己的路由规则。
## 四、Flume配置格式
### 4.1 一个Agent定义一个数据流
在一个Agent定义一个Flow需要以下两个步骤
1. 分别定义好Agent的sourcessinkschannels然后将sources和sinks与通道进行绑定。需要注意的是一个source可以配置多个channel但一个sink只能配置一个channel。
```shell
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>
# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>
```
2. 分别定义sourcesinkchannel的具体属性
```shell
<Agent>.sources.<Source>.<someProperty> = <someValue>
# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>
# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>
```
### 4.2 一个Agent中定义多个数据流
一个Agent可以包含多个独立的Flow。您可以在配置中列出多个sourcessinkschannels。
```shell
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
```
## 五、Flume使用案例
介绍几个Flume的使用案例
+ 案例一使用监听Flume文件内容变动将新增加的内容输出到控制台
+ 案例二监听指定目录将目录下新增加的文件存储到HDFS
+ 案例三:监听对应的端口,将数据发送到远程服务器,远程服务器接收输出,并输出到控制台
### 5.1 案例一
需求: 监听文件内容变动,将新增加的内容输出到控制台
实现: 使用`Exec Source`配合`tail`命令实现
#### 1. 配置
新建配置文件`exec-memory-logger.properties`,其内容如下:
```properties
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
#将sources与channels进行绑定
a1.sources.s1.channels = c1
#配置sink
a1.sinks.k1.type = logger
#将sinks与channels进行绑定
a1.sinks.k1.channel = c1
#配置channel类型
a1.channels.c1.type = memory
```
#### 2. 启动 
```shell
flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-logger.properties \
--name a1 \
-Dflume.root.logger=INFO,console
```
#### 3. 测试
向文件中追加数据
![flume-example-1](D:\BigData-Notes\pictures\flume-example-1.png)
追加内容在日志中显示
![flume-example-2](D:\BigData-Notes\pictures\flume-example-2.png)
### 5.2 案例二
需求: 监听指定目录将目录下新增加的文件存储到HDFS
实现:使用`Spooling Directory Source``HDFS Sink`
#### 1. 配置
```properties
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources属性
a1.sources.s1.type =spooldir
a1.sources.s1.spoolDir =/tmp/logs
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = fileName
#将sources与channels进行绑定
a1.sources.s1.channels =c1
#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/
a1.sinks.k1.hdfs.filePrefix = %{fileName}
#生成的文件类型默认是Sequencefile可用DataStream则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#将sinks与channels进行绑定
a1.sinks.k1.channel = c1
#配置channel类型
a1.channels.c1.type = memory
```
#### 2. 启动
```shell
flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/spooling-memory-hdfs.properties \
--name a1 -Dflume.root.logger=INFO,console
```
#### 3. 测试
拷贝任意文件到监听目录下。可以从日志看到文件上传到HDFS的路径
```shell
# cp log.txt logs/
```
![flume-example-3](D:\BigData-Notes\pictures\flume-example-3.png)
查看上传到HDFS上的文件内容与本地是否一致
```shell
# hdfs dfs -cat /flume/events/19-04-09/13/log.txt.1554788567801
```
![flume-example-4](D:\BigData-Notes\pictures\flume-example-4.png)
```properties
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources属性
a1.sources.s1.type = netcat
a1.sources.s1.bind = hadoop001
a1.sources.s1.port = 44444
a1.sources.s1.channels = c1
#配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 4545
a1.sinks.k1.batch-size = 1
a1.sinks.k1.channel = c1
#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#指定agent的sources,sinks,channels
a2.sources = s2
a2.sinks = k2
a2.channels = c2
#配置sources属性
a2.sources.r2.type = avro
a2.sources.r2.bind = hadoop001
a2.sources.r2.port = 6666
#将sources与channels进行绑定
a2.sources.s1.channels = c2
#配置sink
a2.sinks.k2.type = logger
#将sinks与channels进行绑定
a2.sinks.k2.channel = c2
#配置channel类型
a2.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \
--name a1 -Dflume.root.logger=INFO,console
flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/avro-memory-logger.properties \
--name a2 -Dflume.root.logger=INFO,console
```

View File

@ -3,7 +3,7 @@
## 一、前置条件
Flume需要依赖JDK环境JDK安装方式见本仓库
Flume需要依赖JDK 1.8+JDK安装方式见本仓库
> [Linux环境下JDK安装](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Linux下JDK安装.md)

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 43 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.4 KiB

BIN
pictures/flume-kafka-01.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.4 KiB

BIN
pictures/flume-kafka-2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

BIN
pictures/flume-kafka.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 179 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

167
resources/Reformatting.java Normal file
View File

@ -0,0 +1,167 @@
import javafx.util.Pair;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.util.regex.Pattern.*;
/**
* @author : heibaiying
* @description : 生成导航和图片格式转换
*/
public class Reformatting {
/**
* GITHUB 用户名
**/
private static final String GITHUB_USERNAME = "heibaiying";
/**
* 项目地址
**/
private static final String PROJECT_NAME = "BigData-Notes";
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("请输入文件路径");
return;
}
String dir = "D:\\BigData-Notes\\notes\\Hbase协处理器.md";
String preUrl = "https://github.com/" + GITHUB_USERNAME + "/" + PROJECT_NAME + "/blob/master/pictures/";
String regex = "(!\\[(\\S*)]\\(\\S:\\\\" + PROJECT_NAME + "\\\\pictures\\\\(\\S*)\\)[^(</br>)]*?)";
List<String> filesList = getAllFile(dir, new ArrayList<>());
for (String filePath : filesList) {
// 获取文件内容
String content = getContent(filePath);
// 修改图片
String newContent = changeImageUrl(content, preUrl, regex);
// 获取全部标题
List<Pair<String, String>> allTitle = getAllTitle(newContent);
// 生成导航
String nav = genNav(allTitle);
// 写出并覆盖原文件
write(filePath, newContent, nav);
}
System.out.println("格式转换成功!");
}
private static String changeImageUrl(String content, String preUrl, String oldImageUrlRegex) {
//github 支持的居中方式 <div align="center"> <img src=""/> </div>
return content.replaceAll(oldImageUrlRegex,
String.format("<div align=\"center\"> <img src=\"%s$3\"/> </div>", preUrl));
}
private static List<String> getAllFile(String dir, List<String> filesList) {
File file = new File(dir);
//如果是文件 则不遍历
if (file.isFile() && file.getName().endsWith(".md")) {
filesList.add(file.getAbsolutePath());
}
//如果是文件夹 则遍历下面的所有文件
File[] files = file.listFiles();
if (files != null) {
for (File f : files) {
if (f.isDirectory() && !f.getName().startsWith(".")) {
getAllFile(f.getAbsolutePath(), filesList);
} else if (f.getName().endsWith(".md")) {
filesList.add(f.getAbsolutePath());
}
}
}
return filesList;
}
private static void write(String filePath, String content, String nav) {
try {
String newContent = "";
if (content.contains("<nav>") && content.contains("</nav>")) {
// 如果原来有目录则替换
newContent = content.replaceAll("(?m)(<nav>[\\s\\S]*</nav>)", nav);
} else {
StringBuilder stringBuilder = new StringBuilder(content);
// 如果原来没有目录则title和正文一个标题间写入
int index = content.indexOf("## ");
stringBuilder.insert(index - 1, nav);
newContent = stringBuilder.toString();
}
// 写出覆盖文件
FileWriter fileWriter = new FileWriter(new File(filePath));
fileWriter.write(newContent);
fileWriter.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
private static String genNav(List<Pair<String, String>> flagAndTitles) {
StringBuilder builder = new StringBuilder();
// 目录头
builder.append("<nav>\n");
for (Pair<String, String> ft : flagAndTitles) {
String flag = ft.getKey();
String title = ft.getValue();
builder.append(genBlank(flag.length() - 2, 4));
// Github有效目录格式: <a href="#21-预备">页面锚点</a> url中不能出现特殊符号
String formatTitle = title.trim().replaceAll("[.():|、,@。&/\\\\]", "").replace(" ", "-");
builder.append(String.format("<a href=\"%s\">%s</a><br/>\n", "#" + formatTitle, title));
}
// 目录尾
builder.append("</nav>\n");
return builder.toString();
}
private static String genBlank(int i, int scale) {
StringBuilder builder = new StringBuilder();
for (int j = 0; j < i; j++) {
for (int k = 0; k < scale; k++) {
builder.append("&nbsp;");
}
}
return builder.toString();
}
private static List<Pair<String, String>> getAllTitle(String content) {
List<Pair<String, String>> list = new ArrayList<>();
Pattern pattern = compile("(?m)^(#{2,10})\\s?(.*)");
Matcher matcher = pattern.matcher(content);
while (matcher.find()) {
String group2 = matcher.group(2);
if (!group2.contains("参考资料")) {
list.add(new Pair<>(matcher.group(1), group2));
}
}
return list;
}
private static String getContent(String filePath) {
StringBuilder builder = new StringBuilder();
try {
FileReader reader = new FileReader(filePath);
char[] chars = new char[1024 * 1024];
int read;
while ((read = reader.read(chars)) != -1) {
builder.append(new String(chars, 0, read));
}
} catch (IOException e) {
e.printStackTrace();
}
return builder.toString();
}
}