diff --git a/README.md b/README.md index 48b7cdc..24eb97d 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ -> Java 程序员的大数据学习之路 — — 持续更新中 +> Java 程序员的大数据学习之路(2019) — — 持续更新中 @@ -43,7 +43,7 @@ -> 本仓库涉及的所有软件的详细搭建步骤整理至:[Linux下大数据常用软件安装指南](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Linux中大数据常用软件安装指南.md) +> 为方便查阅,所有软件的安装步骤统一整理至:[Linux下大数据常用软件安装指南](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Linux中大数据常用软件安装指南.md) @@ -81,7 +81,7 @@ TODO 1. Flume简介 2. [Linux环境下Flume的安装部署](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Linux%E4%B8%8BFlume%E7%9A%84%E5%AE%89%E8%A3%85.md) -3. Flume的使用 +3. Flume的基本使用 4. Flume整合Kafka ## 七、Sqoop diff --git a/notes/Flume整合Kafka.md b/notes/Flume整合Kafka.md new file mode 100644 index 0000000..0371d21 --- /dev/null +++ b/notes/Flume整合Kafka.md @@ -0,0 +1,81 @@ +# Flume 整合 Kafka + +## 一、背景 + +先说一下,为什么要使用 Flume + Kafka? + +这里举一个实时流处理的项目为例,由于采集的日志数据可能存在峰值和峰谷,比如如果是一个电商项目,那么峰值就会出现在秒杀时,这时如果直接将Flume聚合后的数据输入到Storm或者Spark Streaming 中进行处理,集群处理压力就会过大,这时采用Kafka就可以起到削峰的作用。Kafka天生就是为大数据场景而设计,具有高吞吐等特性,能很好的抗住峰值数据的冲击。 + +## ![flume-kafka](D:\BigData-Notes\pictures\flume-kafka.png)二、整合流程 + +#### 1. 启动Zookeeper和Kafka + +这里我启动一个单节点的Kafka作为测试 + +```shell +# 启动Zookeeper +zkServer.sh start + +# 启动kafka +bin/kafka-server-start.sh config/server.properties +``` + +#### 2. 创建主题 + +创建一个主题`flume-kafka`,之后flume收集到的数据都会发到这个主题上 + +```shell +# 创建主题 +bin/kafka-topics.sh --create --zookeeper hadoop001:2181 --replication-factor 1 --partitions 1 --topic flume-kafka + +# 查看创建的主题 +bin/kafka-topics.sh --zookeeper hadoop001:2181 --list +``` + + + +#### 3. 启动kafka消费者 + +启动一个消费者,监听我们刚才创建的`flume-kafka`主题 + +```shell +# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flume-kafka +``` + + + +#### 4. 配置Flume + +新建配置文件`exec-memory-kafka.properties`,文件内容如下。这里我们监听一个名为kafka.log的文件,当文件内容有变化时,将新增加的内容发送到kafka上。 + +```properties +a1.sources = s1 a1.channels = c1 +a1.sinks = k1 +a1.sources.s1.type=exec a1.sources.s1.command=tail -F /tmp/kafka.log a1.sources.s1.channels=c1 +#设置Kafka接收器 a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #设置Kafka地址 a1.sinks.k1.brokerList=hadoop001:9092 #设置发送到Kafka上的主题 a1.sinks.k1.topic=flume-kafka #设置序列化方式 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.sinks.k1.channel=c1 + +a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100 +``` + + + +#### 5. 启动Flume + +```shell +flume-ng agent \ +--conf conf \ +--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-kafka.properties \ +--name a1 -Dflume.root.logger=INFO,console +``` + + + +#### 6. 测试 + +向监听的`/tmp/kafka.log `文件中追加内容,查看kafka消费者的输出 + +![flume-kafka-01](D:\BigData-Notes\pictures\flume-kafka-01.png) + +可以看到`flume-kafka`主题的消费端已经收到了对应的消息 + +![flume-kafka-2](D:\BigData-Notes\pictures\flume-kafka-2.png) \ No newline at end of file diff --git a/notes/Flume简介及基本使用.md b/notes/Flume简介及基本使用.md new file mode 100644 index 0000000..befcee4 --- /dev/null +++ b/notes/Flume简介及基本使用.md @@ -0,0 +1,337 @@ +# Flume 简介及基本使用 + +## 一、Flume简介 + +Apache Flume是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到存储系统中。在大数据场景中我们通常使用Flume来进行日志数据收集。 + +版本说明: + +Flume 分为 NG 和 OG (1.0 之前),NG在OG的基础上进行了完全的重构,是目前使用最为广泛的版本。下面的介绍均以NG版本为基础。 + +## 二、Flume架构和基本概念 + + + +![flume-architecture](D:\BigData-Notes\pictures\flume-architecture.png) + +### 2.1 基本架构 + +上图是flume的基本架构图: + +外部数据源以特定格式向Flume发送`events` (事件)。当`source`接收到`events`时,它将其存储到一个或多个`channel`。`channe`会一直保存`events`直到它被`sink`所消费。`sink`的主要功能从`channel`中读取`events`,并将其存入外部存储系统中(如HDFS)或将其转发到下一个Flume的`source`,成功后再从`channel`中移除`events`。 + + + +### 2.2 基本概念 + +以下概念需要记住,在之后的配置文件中会用到: + +**1. Event** +Evnet是可由Flume NG传输的单一数据单元。类似于JMS和消息传递系统中的消息。一个事件由标题和正文组成:前者是键/值映射,后者是任意字节数组。 + +**2. Source** + +数据收集组件,从外部数据源收集数据,并存储到Channel中。 + +**3. Channel** + +Channel是源和接收器之间事件的管道,用于临时存储数据。可以是内存也可以是持久化的文件系统,说明如下。 + ++ Memory Channel : 使用内存,优点是速度快,但是数据可能会丢失。如在突然宕机的情况下,内存中的数据就有丢失的风险; ++ File Channel : 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢。 + +**4. Sink** + +Sink的主要功能从Channel中读取Evnet,并将其存入外部存储系统中(如HDFS)或将其转发到下一个Flume的Source,成功后再从channel中移除Event。 + +**5. Agent** + +是一个独立的(JVM)进程,包含组件Source、 Channel、 Sink等组件。 + + + +### 2.3 组件种类 + +1. Flume提供了多达几十种类型的Source,比如`Avro Source`,`Thrift Source`,`Exec Source`,`JMS Source`等,使得我们仅仅通过配置类型和参数的方式就能从不同的数据源获取数据; + +2. 与Source相比,Flume也提供了多种Sink,比如`HDFS Sink`,`Hive Sink`,`HBaseSinks`,`Avro Sink`,得益于丰富的Sink,我们也可以仅通过配置就能将收集到的数据输出到指定存储位置; + +3. 同样的Flume也支持多种Channel,比如`Memory Channel`,`JDBC Channel`,`Kafka Channel`,`File Channel`等。 + +4. 其实对于Flume的使用,除非有特别的需求,否则通过简单的配置组合Flume内置Source,Sink,Channel就能满足我们大多数的需求,所以对于Flume的基本使用主要是写配置文件为主。 + + 在 Flume 的[官网](http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html)上对所有类型的组件的配置参数均以表格的方式做了详尽的介绍,并且都有配置样例。同时不同版本的参数可能略有所不同,所以建议在使用时,选取官网对应版本的User Guide作为主要参考资料。 + + + +## 三、Flume架构模式 + +Flume 支持多种架构模式,分别介绍如下 + +### 3.1 multi-agent flow + + + +![flume-multi-agent-flow](D:\BigData-Notes\pictures\flume-multi-agent-flow.png) + +Flume支持跨越多个Agent的数据传递,这要求前一个Agent的Sink和下一个Agent的source都必须是`Avro`类型,Sink指向Source所在主机名(或IP地址)和端口。 + +### 3.2 Consolidation + +![flume-consolidation](D:\BigData-Notes\pictures\flume-consolidation.png) + + + +日志收集中常常存在大量的客户端(比如分布式web服务),Flume 支持在使用多个Agent分别收集日志,然后通过一个或者多个Agent聚合后再存储到文件系统中。 + +### 3.3 Multiplexing the flow + +![flume-multiplexing-the-flow](D:\BigData-Notes\pictures\flume-multiplexing-the-flow.png) + +Flume支持从一个source源向多个channnel,也就是向多个sink传递事件,这个操作称之为`fan out`(扇出)。默认情况下`fan out`是向所有的channel复制`event`,即所有channel收到的数据都是相同的,当然Flume也支持在source上自定义一个复用选择器(multiplexing selecto) 来实现我们自己的路由规则。 + + + +## 四、Flume配置格式 + +### 4.1 一个Agent定义一个数据流 + +在一个Agent定义一个Flow需要以下两个步骤: + +1. 分别定义好Agent的sources,sinks,channels,然后将sources和sinks与通道进行绑定。需要注意的是一个source可以配置多个channel,但一个sink只能配置一个channel。 + +```shell +.sources = +.sinks = +.channels = + +# set channel for source +.sources..channels = ... + +# set channel for sink +.sinks..channel = +``` + +2. 分别定义source,sink,channel的具体属性 + +```shell +.sources.. = + +# properties for channels +.channel.. = + +# properties for sinks +.sources.. = +``` + +### 4.2 一个Agent中定义多个数据流 + +一个Agent可以包含多个独立的Flow。您可以在配置中列出多个sources,sinks,channels。 + +```shell +# list the sources, sinks and channels for the agent +.sources = +.sinks = +.channels = +``` + + + +## 五、Flume使用案例 + +介绍几个Flume的使用案例: + ++ 案例一:使用监听Flume文件内容变动,将新增加的内容输出到控制台 ++ 案例二:监听指定目录,将目录下新增加的文件存储到HDFS ++ 案例三:监听对应的端口,将数据发送到远程服务器,远程服务器接收输出,并输出到控制台 + +### 5.1 案例一 + +需求: 监听文件内容变动,将新增加的内容输出到控制台 + +实现: 使用`Exec Source`配合`tail`命令实现 + +#### 1. 配置 + +新建配置文件`exec-memory-logger.properties`,其内容如下: + +```properties +#指定agent的sources,sinks,channels +a1.sources = s1 +a1.sinks = k1 +a1.channels = c1 + +#配置sources属性 +a1.sources.s1.type = exec +a1.sources.s1.command = tail -F /tmp/log.txt +a1.sources.s1.shell = /bin/bash -c + +#将sources与channels进行绑定 +a1.sources.s1.channels = c1 + +#配置sink +a1.sinks.k1.type = logger + +#将sinks与channels进行绑定 +a1.sinks.k1.channel = c1 + +#配置channel类型 +a1.channels.c1.type = memory +``` + +#### 2. 启动  + +```shell +flume-ng agent \ +--conf conf \ +--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-logger.properties \ +--name a1 \ +-Dflume.root.logger=INFO,console +``` + +#### 3. 测试 + +向文件中追加数据 + +![flume-example-1](D:\BigData-Notes\pictures\flume-example-1.png) + +追加内容在日志中显示 + +![flume-example-2](D:\BigData-Notes\pictures\flume-example-2.png) + + + +### 5.2 案例二 + +需求: 监听指定目录,将目录下新增加的文件存储到HDFS + +实现:使用`Spooling Directory Source`和`HDFS Sink` + +#### 1. 配置 + +```properties +#指定agent的sources,sinks,channels +a1.sources = s1 +a1.sinks = k1 +a1.channels = c1 + +#配置sources属性 +a1.sources.s1.type =spooldir +a1.sources.s1.spoolDir =/tmp/logs +a1.sources.s1.basenameHeader = true +a1.sources.s1.basenameHeaderKey = fileName +#将sources与channels进行绑定 +a1.sources.s1.channels =c1 + + +#配置sink +a1.sinks.k1.type = hdfs +a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/ +a1.sinks.k1.hdfs.filePrefix = %{fileName} +#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 +a1.sinks.k1.hdfs.fileType = DataStream +a1.sinks.k1.hdfs.useLocalTimeStamp = true +#将sinks与channels进行绑定 +a1.sinks.k1.channel = c1 + +#配置channel类型 +a1.channels.c1.type = memory +``` + +#### 2. 启动 + +```shell +flume-ng agent \ +--conf conf \ +--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/spooling-memory-hdfs.properties \ +--name a1 -Dflume.root.logger=INFO,console +``` + +#### 3. 测试 + +拷贝任意文件到监听目录下。可以从日志看到文件上传到HDFS的路径 + +```shell +# cp log.txt logs/ +``` + +![flume-example-3](D:\BigData-Notes\pictures\flume-example-3.png) + +查看上传到HDFS上的文件内容与本地是否一致 + +```shell +# hdfs dfs -cat /flume/events/19-04-09/13/log.txt.1554788567801 +``` + +![flume-example-4](D:\BigData-Notes\pictures\flume-example-4.png) + + + + + +```properties +#指定agent的sources,sinks,channels +a1.sources = s1 +a1.sinks = k1 +a1.channels = c1 + +#配置sources属性 +a1.sources.s1.type = netcat +a1.sources.s1.bind = hadoop001 +a1.sources.s1.port = 44444 +a1.sources.s1.channels = c1 + +#配置sink +a1.sinks.k1.type = avro +a1.sinks.k1.hostname = hadoop001 +a1.sinks.k1.port = 4545 +a1.sinks.k1.batch-size = 1 +a1.sinks.k1.channel = c1 + +#配置channel类型 +a1.channels.c1.type = memory +a1.channels.c1.capacity = 1000 +a1.channels.c1.transactionCapacity = 100 + + + + +#指定agent的sources,sinks,channels +a2.sources = s2 +a2.sinks = k2 +a2.channels = c2 + +#配置sources属性 +a2.sources.r2.type = avro +a2.sources.r2.bind = hadoop001 +a2.sources.r2.port = 6666 + +#将sources与channels进行绑定 +a2.sources.s1.channels = c2 + +#配置sink +a2.sinks.k2.type = logger + +#将sinks与channels进行绑定 +a2.sinks.k2.channel = c2 + +#配置channel类型 +a2.channels.c2.type = memory +a1.channels.c2.capacity = 1000 +a1.channels.c2.transactionCapacity = 100 + + + + +flume-ng agent \ +--conf conf \ +--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \ +--name a1 -Dflume.root.logger=INFO,console + + +flume-ng agent \ +--conf conf \ +--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/avro-memory-logger.properties \ +--name a2 -Dflume.root.logger=INFO,console +``` + diff --git a/notes/installation/Linux下Flume的安装.md b/notes/installation/Linux下Flume的安装.md index 11f7f28..21c0d79 100644 --- a/notes/installation/Linux下Flume的安装.md +++ b/notes/installation/Linux下Flume的安装.md @@ -3,7 +3,7 @@ ## 一、前置条件 -Flume需要依赖JDK环境,JDK安装方式见本仓库: +Flume需要依赖JDK 1.8+,JDK安装方式见本仓库: > [Linux环境下JDK安装](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Linux下JDK安装.md) diff --git a/pictures/flume-architecture.png b/pictures/flume-architecture.png new file mode 100644 index 0000000..e587daf Binary files /dev/null and b/pictures/flume-architecture.png differ diff --git a/pictures/flume-consolidation.png b/pictures/flume-consolidation.png new file mode 100644 index 0000000..85bdd95 Binary files /dev/null and b/pictures/flume-consolidation.png differ diff --git a/pictures/flume-example-1.png b/pictures/flume-example-1.png new file mode 100644 index 0000000..9cfb769 Binary files /dev/null and b/pictures/flume-example-1.png differ diff --git a/pictures/flume-example-2.png b/pictures/flume-example-2.png new file mode 100644 index 0000000..e8c3d83 Binary files /dev/null and b/pictures/flume-example-2.png differ diff --git a/pictures/flume-example-3.png b/pictures/flume-example-3.png new file mode 100644 index 0000000..72a41db Binary files /dev/null and b/pictures/flume-example-3.png differ diff --git a/pictures/flume-example-4.png b/pictures/flume-example-4.png new file mode 100644 index 0000000..6b3e998 Binary files /dev/null and b/pictures/flume-example-4.png differ diff --git a/pictures/flume-example-7.png b/pictures/flume-example-7.png new file mode 100644 index 0000000..ae48ce9 Binary files /dev/null and b/pictures/flume-example-7.png differ diff --git a/pictures/flume-kafka-01.png b/pictures/flume-kafka-01.png new file mode 100644 index 0000000..06d5130 Binary files /dev/null and b/pictures/flume-kafka-01.png differ diff --git a/pictures/flume-kafka-2.png b/pictures/flume-kafka-2.png new file mode 100644 index 0000000..959aa4c Binary files /dev/null and b/pictures/flume-kafka-2.png differ diff --git a/pictures/flume-kafka.png b/pictures/flume-kafka.png new file mode 100644 index 0000000..650a2b3 Binary files /dev/null and b/pictures/flume-kafka.png differ diff --git a/pictures/flume-multi-agent-flow.png b/pictures/flume-multi-agent-flow.png new file mode 100644 index 0000000..b028b8b Binary files /dev/null and b/pictures/flume-multi-agent-flow.png differ diff --git a/pictures/flume-multiplexing-the-flow.png b/pictures/flume-multiplexing-the-flow.png new file mode 100644 index 0000000..ea97b2d Binary files /dev/null and b/pictures/flume-multiplexing-the-flow.png differ diff --git a/resources/Reformatting.java b/resources/Reformatting.java new file mode 100644 index 0000000..9a70635 --- /dev/null +++ b/resources/Reformatting.java @@ -0,0 +1,167 @@ +import javafx.util.Pair; + +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static java.util.regex.Pattern.*; + +/** + * @author : heibaiying + * @description : 生成导航和图片格式转换 + */ +public class Reformatting { + + /** + * GITHUB 用户名 + **/ + private static final String GITHUB_USERNAME = "heibaiying"; + /** + * 项目地址 + **/ + private static final String PROJECT_NAME = "BigData-Notes"; + + public static void main(String[] args) { + + if (args.length < 1) { + System.out.println("请输入文件路径"); + return; + } + + String dir = "D:\\BigData-Notes\\notes\\Hbase协处理器.md"; + + String preUrl = "https://github.com/" + GITHUB_USERNAME + "/" + PROJECT_NAME + "/blob/master/pictures/"; + String regex = "(!\\[(\\S*)]\\(\\S:\\\\" + PROJECT_NAME + "\\\\pictures\\\\(\\S*)\\)[^(
)]*?)"; + + List filesList = getAllFile(dir, new ArrayList<>()); + for (String filePath : filesList) { + // 获取文件内容 + String content = getContent(filePath); + // 修改图片 + String newContent = changeImageUrl(content, preUrl, regex); + // 获取全部标题 + List> allTitle = getAllTitle(newContent); + // 生成导航 + String nav = genNav(allTitle); + // 写出并覆盖原文件 + write(filePath, newContent, nav); + } + System.out.println("格式转换成功!"); + } + + + private static String changeImageUrl(String content, String preUrl, String oldImageUrlRegex) { + + //github 支持的居中方式
+ return content.replaceAll(oldImageUrlRegex, + String.format("
", preUrl)); + + } + + private static List getAllFile(String dir, List filesList) { + File file = new File(dir); + //如果是文件 则不遍历 + if (file.isFile() && file.getName().endsWith(".md")) { + filesList.add(file.getAbsolutePath()); + } + //如果是文件夹 则遍历下面的所有文件 + File[] files = file.listFiles(); + if (files != null) { + for (File f : files) { + if (f.isDirectory() && !f.getName().startsWith(".")) { + getAllFile(f.getAbsolutePath(), filesList); + } else if (f.getName().endsWith(".md")) { + filesList.add(f.getAbsolutePath()); + } + } + } + return filesList; + } + + + private static void write(String filePath, String content, String nav) { + try { + String newContent = ""; + if (content.contains("")) { + // 如果原来有目录则替换 + newContent = content.replaceAll("(?m)()", nav); + } else { + StringBuilder stringBuilder = new StringBuilder(content); + // 如果原来没有目录,则title和正文一个标题间写入 + int index = content.indexOf("## "); + stringBuilder.insert(index - 1, nav); + newContent = stringBuilder.toString(); + } + // 写出覆盖文件 + FileWriter fileWriter = new FileWriter(new File(filePath)); + fileWriter.write(newContent); + fileWriter.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + private static String genNav(List> flagAndTitles) { + StringBuilder builder = new StringBuilder(); + // 目录头 + builder.append("\n"); + return builder.toString(); + } + + private static String genBlank(int i, int scale) { + StringBuilder builder = new StringBuilder(); + for (int j = 0; j < i; j++) { + for (int k = 0; k < scale; k++) { + builder.append(" "); + } + } + return builder.toString(); + } + + private static List> getAllTitle(String content) { + List> list = new ArrayList<>(); + Pattern pattern = compile("(?m)^(#{2,10})\\s?(.*)"); + Matcher matcher = pattern.matcher(content); + while (matcher.find()) { + String group2 = matcher.group(2); + if (!group2.contains("参考资料")) { + list.add(new Pair<>(matcher.group(1), group2)); + } + } + return list; + } + + private static String getContent(String filePath) { + StringBuilder builder = new StringBuilder(); + + try { + FileReader reader = new FileReader(filePath); + char[] chars = new char[1024 * 1024]; + + int read; + while ((read = reader.read(chars)) != -1) { + builder.append(new String(chars, 0, read)); + } + } catch (IOException e) { + e.printStackTrace(); + } + return builder.toString(); + } + +}