diff --git a/README.md b/README.md index 832a883..d11355b 100644 --- a/README.md +++ b/README.md @@ -97,12 +97,13 @@ 1. [Storm和流处理简介](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm和流处理简介.md) 2. [Storm核心概念详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm核心概念详解.md) -3. [Storm单机版本环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Storm单机版本环境搭建.md) -4. [Storm编程模型详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm编程模型详解.md) -5. [Storm项目三种打包方式对比分析](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm三种打包方式对比分析.md) -6. [Storm集成Redis详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm集成Redis详解.md) -7. [Storm集成HDFS/HBase](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm集成HBase和HDFS.md) -8. [Storm集成Kafka](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm集成Kakfa.md) +3. [Storm单机环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Storm单机环境搭建.md) +4. [Storm集群环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Storm集群环境搭建.md) +5. [Storm编程模型详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm编程模型详解.md) +6. [Storm项目三种打包方式对比分析](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm三种打包方式对比分析.md) +7. [Storm集成Redis详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm集成Redis详解.md) +8. [Storm集成HDFS/HBase](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm集成HBase和HDFS.md) +9. [Storm集成Kafka](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm集成Kakfa.md) ## 五、Flink diff --git a/notes/Storm三种打包方式对比分析.md b/notes/Storm三种打包方式对比分析.md index dfe06cb..2cd6bcd 100644 --- a/notes/Storm三种打包方式对比分析.md +++ b/notes/Storm三种打包方式对比分析.md @@ -77,16 +77,40 @@ maven-assembly-plugin是官方文档中介绍的打包方法,来源于官方 > > Then run mvn assembly:assembly to get an appropriately packaged jar. Make sure you [exclude](http://maven.apache.org/plugins/maven-assembly-plugin/examples/single/including-and-excluding-artifacts.html) the Storm jars since the cluster already has Storm on the classpath. -官方文档说明了以下两点: +官方文档主要说明了以下几点: -- maven-assembly-plugin会把所有的依赖一并打包到最后的JAR中; -- 需要排除掉Storm集群环境中已经提供的Storm jars。 +- 使用maven-assembly-plugin可以把所有的依赖一并打入到最后的JAR中; +- 需要排除掉Storm集群环境中已经提供的Storm jars; +- 通过` `标签指定主入口类; +- 通过``标签指定打包相关配置。 -所以采用maven-assembly-plugin进行打包时,配置应该如下: +`jar-with-dependencies`是Maven[预定义](http://maven.apache.org/plugins/maven-assembly-plugin/descriptor-refs.html#jar-with-dependencies)的一种最基本的打包配置,其XML文件如下: -### 1. 基本配置 +```xml + + jar-with-dependencies + + jar + + false + + + / + true + true + runtime + + + +``` -在POM.xml中引入插件,并指定打包格式的配置文件`assembly.xml`(名称可自定义): +我们可以通过对该配置文件进行拓展,从而实现更多的功能,比如排除指定的JAR等。使用示例如下: + +### 1. 引入插件 + +在POM.xml中引入插件,并指定打包格式的配置文件为`assembly.xml`(名称可自定义): ```xml @@ -108,7 +132,7 @@ maven-assembly-plugin是官方文档中介绍的打包方法,来源于官方 ``` -assembly.xml文件内容如下: +`assembly.xml`拓展自`jar-with-dependencies.xml`,使用了``标签排除Storm jars,具体内容如下: ```xml ``` -以上配置示例来源于Storm在Github上的examples,这里做一下说明: +以上配置示例来源于Storm Github,这里做一下说明: 在上面的配置中,排除了部分文件,这是因为有些JAR包生成时,会使用jarsigner生成文件签名(完成性校验),分为两个文件存放在META-INF目录下: diff --git a/notes/Storm核心概念详解.md b/notes/Storm核心概念详解.md index 80e478f..c1f5c2e 100644 --- a/notes/Storm核心概念详解.md +++ b/notes/Storm核心概念详解.md @@ -42,16 +42,39 @@ `spouts`和`bolts`在集群上执行任务时,是由多个Task并行执行(如上图,每一个圆圈代表一个Task)。当一个tuple需要从Bolt A发送给Bolt B执行的时候,程序如何知道应该发送给Bolt B的哪一个Task执行呢? -这是由Stream groupings分组策略来决定的。Storm中一共有如下8个内置的 Stream Grouping。你也可以通过实现 `CustomStreamGrouping`接口来自实现自定义Stream groupings。 +这是由Stream groupings分组策略来决定的,Storm中一共有如下8个内置的Stream Grouping。当然你也可以通过实现 `CustomStreamGrouping`接口来实现自定义Stream分组策略。 -1. **Shuffle grouping** : Tuples随机的分发到每个Bolt的每个Task上,每个Bolt获取到等量的Tuples。 -2. **Fields grouping** : Streams通过grouping指定的字段(field)来分组。假设通过 "user-id" 字段进行分区,那么具有相同 "user-id" 的Tuple就会发送到同一个Task; -3. **Partial Key grouping **: Streams通过grouping中指定的字段(field)来分组,与`Fields Grouping`相似。但是对于两个下游的Bolt来说是负载均衡的,可以在输入数据不平均的情况下提供更好的优化。 -4. **All grouping** : Streams会被所有的Bolt的Tasks进行复制。由于存在数据重复处理,所以需要谨慎使用。 -5. **Global grouping** : 整个Streams会进入Bolt的其中一个Task。通常会进入id最小的Task。 -6. **None grouping **: 当前None grouping 和Shuffle grouping 等价,都是进行随机分发; -7. **Direct grouping **: Direct grouping只能被用于direct streams 。使用这种方式需要由Tuple的**生产者**直接指定由哪个Task进行处理; -8. **Local or shuffle grouping** : 如果目标bolt有一个或者多个tasks在同一个worker进程中,tuple将会被shuffled到处于同一个进程的多个tasks上。否则,和普通的Shuffle Grouping行为一致。 +1. **Shuffle grouping** + + Tuples随机的分发到每个Bolt的每个Task上,每个Bolt获取到等量的Tuples。 + +2. **Fields grouping** + + Streams通过grouping指定的字段(field)来分组。假设通过`user-id`字段进行分区,那么具有相同`user-id`的Tuple就会发送到同一个Task。 + +3. **Partial Key grouping ** + + Streams通过grouping中指定的字段(field)来分组,与`Fields Grouping`相似。但是对于两个下游的Bolt来说是负载均衡的,可以在输入数据不平均的情况下提供更好的优化。 + +4. **All grouping** + + Streams会被所有的Bolt的Tasks进行复制。由于存在数据重复处理,所以需要谨慎使用。 + +5. **Global grouping** + + 整个Streams会进入Bolt的其中一个Task。通常会进入id最小的Task。 + +6. **None grouping ** + + 当前None grouping 和Shuffle grouping等价,都是进行随机分发。 + +7. **Direct grouping ** + + Direct grouping只能被用于direct streams 。使用这种方式需要由Tuple的**生产者**直接指定由哪个Task进行处理。 + +8. **Local or shuffle grouping** + + 如果目标bolt有tasks和当前bolt的tasks处在同一个worker进程中,那么则优先将tuples shuffled到处于同一个进程的目标bolt的tasks上,这样可以最大限度地减少网络传输。否则,就和普通的`Shuffle Grouping`行为一致。 @@ -59,49 +82,54 @@
-### 2.1 nimbus进程 +### 2.1 Nimbus进程 - 也叫做 master node,是storm集群工作的全局指挥官。 + 也叫做Master Node,是Storm集群工作的全局指挥官。主要功能如下: -1. 通过thrift接口,监听并接收client对topology的submit, 将topology代码保存到本地目录/nimbus/stormdist/下 ; -2. 为client提交的topology计算任务分配,根据集群worker资源情况,计算出topology的spoult和bolt的tasks应该如何在worker间分配,任务分配结果写入zookeeper ; -3. 通过thrift接口,监听supervisor的下载topology代码的请求,并提供下载 ; -4. 通过thrift接口,监听UI对统计信息的读取,从zookeeper上读取统计信息,返回给UI; +1. 通过Thrift接口,监听并接收Client提交的Topology; +2. 根据集群Workers的资源情况,将Client提交的Topology进行任务分配,分配结果写入zookeeper ; +3. 通过Thrift接口,监听Supervisor的下载Topology代码的请求,并提供下载 ; +4. 通过Thrift接口,监听UI对统计信息的读取,从zookeeper上读取统计信息,返回给UI; 5. 若进程退出后,立即在本机重启,则不影响集群运行。 -### 2.2 supervisor进程 +### 2.2 Supervisor进程 -也叫做 worker node , storm集群的资源管理者,按需启动worker进程。 +也叫做Worker Node , 是Storm集群的资源管理者,按需启动worker进程。主要功能如下: -1. 定时从zookeeper 检查是否有新topology代码未下载到本地 ,并定时删除旧topology代码 ; -2. 根据nimbus的任务分配计划,在本机按需启动1个或多个worker进程,并监控守护所有的worker进程; +1. 定时从Zookeeper检查是否有新Topology代码未下载到本地 ,并定时删除旧Topology代码 ; +2. 根据Nimbus的任务分配计划,在本机按需启动1个或多个Worker进程,并监控所有的worker进程的情况; 3. 若进程退出,立即在本机重启,则不影响集群运行。 ### 2.3 zookeeper的作用 -Nimbus和Supervisor进程都被设计为**快速失败**(遇到任何意外情况时进程自毁)和**无状态**(所有状态保存在Zookeeper或磁盘上)。 这样设计的好处就是如果它们的进程被意外销毁,那么在重新启动后,就只需要并从zookeeper上获取之前的状态数据即可,并不会造成任何影响。 +Nimbus和Supervisor进程都被设计为**快速失败**(遇到任何意外情况时进程自毁)和**无状态**(所有状态保存在Zookeeper或磁盘上)。 这样设计的好处就是如果它们的进程被意外销毁,那么在重新启动后,就只需要从Zookeeper上获取之前的状态数据即可,并不会造成任何数据丢失。 -### 2.4 worker进程 +### 2.4 Worker进程 -Storm集群的任务构造者 ,构造spoult或bolt的task实例,启动executor线程。 +Storm集群的任务构造者 ,构造Spoult或Bolt的Task实例,启动Executor线程。主要功能如下: + +1. 根据Zookeeper上分配的Task,在本进程中启动1个或多个Executor线程,将构造好的Eask实例交给Executor去运行; + +2. 向Zookeeper写入心跳 ; + +3. 维持传输队列,发送Tuple到其他的Worker ; -1. 根据zookeeper上分配的task,在本进程中启动1个或多个executor线程,将构造好的task实例交给executor去运行; -2. 向zookeeper写入心跳 ; -3. 维持传输队列,发送tuple到其他的worker ; 4. 若进程退出,立即在本机重启,则不影响集群运行。 -### 2.5 executor线程 + -storm集群的任务执行者 ,循环执行task代码。 +### 2.5 Executor线程 -1. 执行1个或多个task(每个task对应spout或bolt的1个并行度),将输出加入到worker里的tuple队列 ; -2. 执行storm内部线程acker,负责发送消息处理状态给对应spout所在的worker。 +Storm集群的任务执行者 ,循环执行Task代码。主要功能如下: + +1. 执行1个或多个Task; +2. 执行Acker机制,负责发送Task处理状态给对应Spout所在的worker。 @@ -109,16 +137,19 @@ storm集群的任务执行者 ,循环执行task代码。
-1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服务)。1个worker进程会启动1个或多个executor线程来执行1个topology的component(spout或bolt)。因此1个运行中的topology就是由集群中多台物理机上的多个worker进程组成的。 +1个Worker进程执行的是1个Topology的子集,不会出现1个Worker为多个Topology服务的情况,因此1个运行中的Topology就是由集群中多台物理机上的多个Worker进程组成的。1个Worker进程会启动1个或多个Executor线程来执行1个Topology的Component(组件,即Spout或Bolt)。 -executor是1个被worker进程启动的单独线程。每个executor会运行1个component(spout或bolt)的中的一个或者多个task。 +Executor是1个被Worker进程启动的单独线程。每个Executor会运行1个Component中的一个或者多个Task。 -task是最终运行spout或bolt中代码的单元。topology启动后,1个component(spout或bolt)的task数目是固定不变的,但该component使用的executor线程数可以动态调整(例如:1个executor线程可以执行该component的1个或多个task实例)。这意味着,对于1个component:`#threads<=#tasks`(即:线程数小于等于task数目)这样的情况是存在的。默认情况下task的数目等于executor线程数目,即1个executor线程只运行1个task。 +Task是组成Component的代码单元。Topology启动后,1个Component的Task数目是固定不变的,但该Component使用的Executor线程数可以动态调整(例如:1个Executor线程可以执行该Component的1个或多个Task实例)。这意味着,对于1个Component来说,`#threads<=#tasks`(线程数小于等于Task数目)这样的情况是存在的。默认情况下Task的数目等于Executor线程数,即1个Executor线程只运行1个Task。 -**在默认情况下**: +**总结如下:**: + ++ 一个运行中的Topology由集群中的多个Worker进程组成的; ++ 在默认情况下,每个Worker进程默认启动一个Executor线程; ++ 在默认情况下,每个Executor默认启动一个Task线程; ++ Task是组成Component的代码单元。 -+ 每个worker进程默认启动一个executor线程 -+ 每个executor默认启动一个task线程 ## 参考资料 diff --git a/notes/installation/Storm单机版本环境搭建.md b/notes/installation/Storm单机环境搭建.md similarity index 92% rename from notes/installation/Storm单机版本环境搭建.md rename to notes/installation/Storm单机环境搭建.md index d39da15..e4e4ca8 100644 --- a/notes/installation/Storm单机版本环境搭建.md +++ b/notes/installation/Storm单机环境搭建.md @@ -46,7 +46,7 @@ export PATH=$STORM_HOME/bin:$PATH ### 4. 启动相关进程 -因为要启动多个进程,所以统一采用后台进程的方式启动,采用后台启动时需要进入到`${STORM_HOME}/bin`目录下,命令如下: +因为要启动多个进程,所以统一采用后台进程的方式启动。进入到`${STORM_HOME}/bin`目录下,依次执行下面的命令: ```shell # 启动zookeeper diff --git a/notes/installation/Storm集群环境搭建.md b/notes/installation/Storm集群环境搭建.md new file mode 100644 index 0000000..852ba66 --- /dev/null +++ b/notes/installation/Storm集群环境搭建.md @@ -0,0 +1,139 @@ +# Storm集群环境搭建 + + + +## 一、集群规划 + +这里我们采用三台服务器搭建一个Storm集群,集群由一个1个Nimbus和3个Supervisor组成,因为只有三台服务器,所以hadoop001上既为Nimbus节点,也为Supervisor节点。 + +
+ +## 二、环境要求 + +Storm 运行依赖于Java 7+ 和 Python 2.6.6 +,所以需要预先安装这两个软件。同时为了保证高可用,这里我们不采用Storm内置的Zookeeper,而采用外置的Zookeeper集群。由于这三个软件在多个框架中都有依赖,其安装步骤单独整理至 : + +- [Linux环境下JDK安装](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Linux下JDK安装.md) +- [Linux环境下Python安装](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Linux下Python安装.md) + ++ [Zookeeper单机环境和集群环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Zookeeper单机环境和集群环境搭建.md) + + + +## 三、安装步骤 + +### 1. 下载并解压 + +下载安装包,使用scp命令分发到三台服务器上,之后进行解压。官方下载地址:http://storm.apache.org/downloads.html + +```shell +# 解压 +tar -zxvf apache-storm-1.2.2.tar.gz + +# 分发 +scp -r /usr/app/apache-storm-1.2.2/ root@hadoop002:/usr/app/ +scp -r /usr/app/apache-storm-1.2.2/ root@hadoop003:/usr/app/ +``` + +### 2. 配置环境变量 + +为了方便,三台服务器均配置一下环境变量: + +```shell +# vim /etc/profile +``` + +添加环境变量: + +```shell +export STORM_HOME=/usr/app/apache-storm-1.2.2 +export PATH=$STORM_HOME/bin:$PATH +``` + +使得配置的环境变量生效: + +```shell +# source /etc/profile +``` + +### 3. 集群配置 + +修改每台服务器上的`${STORM_HOME}/conf/storm.yaml`文件,配置均如下: + +```yaml +# Zookeeper集群的主机列表 +storm.zookeeper.servers: + - "hadoop001" + - "hadoop002" + - "hadoop003" + +# Nimbus的节点列表 +nimbus.seeds: ["hadoop001"] + +# Nimbus和Supervisor需要使用本地磁盘上来存储少量状态(如jar包,配置文件等) +storm.local.dir: "/usr/local/tmp/storm" + +# workers进程的端口,每个worker进程会使用一个端口来接收消息 +supervisor.slots.ports: + - 6700 + - 6701 + - 6702 + - 6703 +``` + +`supervisor.slots.ports`参数用来配置workers进程接收消息的端口,默认每个supervisor节点上会启动4个worker,当然你也可以按照自己的需要和服务器性能进行设置,假设只想启动2个worker的话,此处配置2个端口即可。 + +### 4. 启动服务 + +先启动Zookeeper集群,之后再启动Storm集群。因为要启动多个进程,所以统一采用后台进程的方式启动,进入到`${STORM_HOME}/bin`目录下,依次执行下面的命令: + +**hadoop001** + +因为hadoop001是nimbus节点,所以需要启动nimbus服务和ui服务;同时hadoop001也是supervisor节点,所以需要启动supervisor服务和logviewer服务: + +```shell +# 启动主节点 nimbus +nohup sh storm nimbus & +# 启动从节点 supervisor +nohup sh storm supervisor & +# 启动UI界面 ui +nohup sh storm ui & +# 启动日志查看服务 logviewer +nohup sh storm logviewer & +``` + +**hadoop002 & hadoop003** + +hadoop002和hadoop003都只需要启动supervisor服务和logviewer服务: + +```shell +# 启动从节点 supervisor +nohup sh storm supervisor & +# 启动日志查看服务 logviewer +nohup sh storm logviewer & +``` + + + +### 5. 查看进程 + +使用`jps`查看进程,三台服务器的进程应该分别如下: + +
+ + + +### 6. 查看Web-UI界面 + +访问hadoop001的8080端口,界面应如下图,可以看到有1个Nimbus和3个Supervisor,并且每个Supervisor有四个slots,即四个可用的worker进程,此时代表集群已经搭建成功。 + +
\ No newline at end of file diff --git a/pictures/storm-集群-shell.png b/pictures/storm-集群-shell.png new file mode 100644 index 0000000..457b159 Binary files /dev/null and b/pictures/storm-集群-shell.png differ diff --git a/pictures/storm-集群规划.png b/pictures/storm-集群规划.png new file mode 100644 index 0000000..65e85df Binary files /dev/null and b/pictures/storm-集群规划.png differ diff --git a/pictures/storm集群.png b/pictures/storm集群.png new file mode 100644 index 0000000..64d2f46 Binary files /dev/null and b/pictures/storm集群.png differ