Storm编程模型详解
This commit is contained in:
parent
403c6c9994
commit
a4e334e4a9
@ -20,15 +20,13 @@
|
||||
|
||||
## 一、简介
|
||||
|
||||
下图为Strom的运行流程图,也是storm的编程模型图,在storm 进行流处理时,我们需要自定义实现自己的spout(数据源)和bolt(处理单元),并通过`TopologyBuilder`将它们之间进行关联,定义好数据处理的流程。
|
||||
|
||||
下面小结分别介绍如何按照storm内置接口分别实现spout和bolt,然后将其进行关联,最后将其提交到本地和服务器进行运行。
|
||||
下图为Strom的运行流程图,在开发Storm流处理程序时,我们需要采用内置或自定义实现`spout`(数据源)和`bolt`(处理单元),并通过`TopologyBuilder`将它们之间进行关联,形成`Topology`。
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spout-bolt.png"/> </div>
|
||||
|
||||
## 二、IComponent接口
|
||||
|
||||
`IComponent`接口定义了Topology中所有组件(spout/bolt)的公共方法,我们实现spout或bolt都必须直接或者间接实现这个接口。
|
||||
`IComponent`接口定义了Topology中所有组件(spout/bolt)的公共方法,自定义的spout或bolt都必须直接或间接实现这个接口。
|
||||
|
||||
```java
|
||||
public interface IComponent extends Serializable {
|
||||
@ -52,7 +50,7 @@ public interface IComponent extends Serializable {
|
||||
|
||||
### 3.1 ISpout接口
|
||||
|
||||
实现自定义的spout需要实现`ISpout`,其定义了spout的所有可用方法:
|
||||
自定义的spout需要实现`ISpout`接口,它定义了spout的所有可用方法:
|
||||
|
||||
```java
|
||||
public interface ISpout extends Serializable {
|
||||
@ -81,9 +79,9 @@ public interface ISpout extends Serializable {
|
||||
void deactivate();
|
||||
|
||||
/**
|
||||
* 这是一个核心方法,主要通过在此方法中调用collector将tuples发送给下一个接收器,这个方法必须是非阻塞的。
|
||||
* nextTuple/ack/fail/是在同一个线程中执行的,所以不用考虑线程安全方面。当没有tuples发出时应该让nextTuple
|
||||
* 休眠(sleep)一下,以免浪费CPU。
|
||||
* 这是一个核心方法,主要通过在此方法中调用collector将tuples发送给下一个接收器,这个方法必须是非阻塞的。
|
||||
* nextTuple/ack/fail/是在同一个线程中执行的,所以不用考虑线程安全方面。当没有tuples发出时应该让
|
||||
* nextTuple休眠(sleep)一下,以免浪费CPU。
|
||||
*/
|
||||
void nextTuple();
|
||||
|
||||
@ -105,7 +103,7 @@ public interface ISpout extends Serializable {
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-baseRichSpout.png"/> </div>
|
||||
|
||||
`IRichSpout`接口继承自`ISpout`和`IComponent`,自身并没有定义任何方法。
|
||||
`IRichSpout`接口继承自`ISpout`和`IComponent`,自身并没有定义任何方法:
|
||||
|
||||
```java
|
||||
public interface IRichSpout extends ISpout, IComponent {
|
||||
@ -113,7 +111,7 @@ public interface IRichSpout extends ISpout, IComponent {
|
||||
}
|
||||
```
|
||||
|
||||
BaseComponent 抽象类也仅仅是空实现了`IComponent`的`getComponentConfiguration`方法。
|
||||
`BaseComponent`抽象类空实现了`IComponent`中`getComponentConfiguration`方法:
|
||||
|
||||
```java
|
||||
public abstract class BaseComponent implements IComponent {
|
||||
@ -124,7 +122,7 @@ public abstract class BaseComponent implements IComponent {
|
||||
}
|
||||
```
|
||||
|
||||
`BaseRichSpout`通过继承自`BaseCompont`,同时实现了`IRichSpout`接口,并且空实现了其中部分方法。
|
||||
`BaseRichSpout`继承自`BaseCompont`类并实现了`IRichSpout`接口,并且空实现了其中部分方法:
|
||||
|
||||
```java
|
||||
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
|
||||
@ -145,17 +143,17 @@ public abstract class BaseRichSpout extends BaseComponent implements IRichSpout
|
||||
}
|
||||
```
|
||||
|
||||
通过这样的设计,我们在继承`BaseRichSpout`实现自己的spout时,就只需要实现三个必须的方法:
|
||||
通过这样的设计,我们在继承`BaseRichSpout`实现自定义spout时,就只有三个方法必须实现:
|
||||
|
||||
+ open : 来源于ISpout,可以通过此方法获取用来发送tuples的`SpoutOutputCollector`;
|
||||
+ nextTuple :来源于ISpout,必须在此方法内部才能调用`SpoutOutputCollector`发送tuple;
|
||||
+ declareOutputFields :来源于IComponent,通过此方法声明发送的tuple的名称,这样下一个组件才能知道如何接受数据。
|
||||
+ **open** : 来源于ISpout,可以通过此方法获取用来发送tuples的`SpoutOutputCollector`;
|
||||
+ **nextTuple** :来源于ISpout,必须在此方法内部发送tuples;
|
||||
+ **declareOutputFields** :来源于IComponent,声明发送的tuples的名称,这样下一个组件才能知道如何接受。
|
||||
|
||||
|
||||
|
||||
## 四、Bolt
|
||||
|
||||
通过上小结我们已经了解了storm如何对spout接口进行设计的,bolt接口的设计也是一样的。
|
||||
bolt接口的设计与spout的类似:
|
||||
|
||||
### 4.1 IBolt 接口
|
||||
|
||||
@ -192,11 +190,11 @@ public interface IBolt extends Serializable {
|
||||
|
||||
### 4.2 BaseRichBolt抽象类
|
||||
|
||||
同样的,在实现我们自己的bolt时,我们也通常是继承`BaseRichBolt`抽象类来实现。`BaseRichBolt`继承自`BaseComponent`抽象类,并实现了`IRichBolt`接口。
|
||||
同样的,在实现自定义bolt时,通常是继承`BaseRichBolt`抽象类来实现。`BaseRichBolt`继承自`BaseComponent`抽象类并实现了`IRichBolt`接口。
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-baseRichbolt.png"/> </div>
|
||||
|
||||
`IRichBolt`接口继承自`IBolt`和`IComponent`,自身并没有定义任何方法。
|
||||
`IRichBolt`接口继承自`IBolt`和`IComponent`,自身并没有定义任何方法:
|
||||
|
||||
```
|
||||
public interface IRichBolt extends IBolt, IComponent {
|
||||
@ -204,11 +202,11 @@ public interface IRichBolt extends IBolt, IComponent {
|
||||
}
|
||||
```
|
||||
|
||||
通过这样的设计,我们在继承`BaseRichBolt`实现自己的bolt时,就只需要实现三个必须的方法:
|
||||
通过这样的设计,在继承`BaseRichBolt`实现自定义bolt时,就只需要实现三个必须的方法:
|
||||
|
||||
- prepare: 来源于IBolt,可以通过此方法获取用来发送tuples的`OutputCollector`;
|
||||
- execute:来源于IBolt,处理tuple和发送处理完成的tuple;
|
||||
- declareOutputFields :来源于IComponent,通过此方法声明发送的tuple的名称,这样下一个组件才能知道如何接受数据。
|
||||
- **prepare**: 来源于IBolt,可以通过此方法获取用来发送tuples的`OutputCollector`;
|
||||
- **execute**:来源于IBolt,处理tuples和发送处理完成的tuples;
|
||||
- **declareOutputFields** :来源于IComponent,声明发送的tuples的名称,这样下一个组件才能知道如何接收。
|
||||
|
||||
|
||||
|
||||
@ -216,7 +214,7 @@ public interface IRichBolt extends IBolt, IComponent {
|
||||
|
||||
### 5.1 案例简介
|
||||
|
||||
使用模拟数据进行词频统计。这里我们使用自定义的DataSourceSpout产生模拟数据。实际生产环境中通常是通过日志收集引擎(如Flume、logstash等)将收集到的数据发送到kafka指定的topic,通过storm内置的`KafkaSpout `来监听该topic,并获取数据。
|
||||
这里我们使用自定义的`DataSourceSpout`产生词频数据,然后使用自定义的`SplitBolt`和`CountBolt`来进行词频统计。
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-word-count-p.png"/> </div>
|
||||
|
||||
@ -353,7 +351,7 @@ public class CountBolt extends BaseRichBolt {
|
||||
|
||||
#### 5. LocalWordCountApp
|
||||
|
||||
通过TopologyBuilder将上面定义好的组件进行串联形成 Topology,并提交到本地集群(LocalCluster)运行。在通常开发中,可用本地集群进行,测试完成后再提交到服务器集群运行。
|
||||
通过TopologyBuilder将上面定义好的组件进行串联形成 Topology,并提交到本地集群(LocalCluster)运行。通常在开发中,可先用本地模式进行测试,测试完成后再提交到服务器集群运行。
|
||||
|
||||
```java
|
||||
public class LocalWordCountApp {
|
||||
@ -382,7 +380,7 @@ public class LocalWordCountApp {
|
||||
|
||||
#### 6. 运行结果
|
||||
|
||||
启动`WordCountApp`的main方法即可运行,采用本地模式storm会自动在本地搭建一个集群,所以启动的过程会稍慢一点,启动成功后即可看到输出日志。
|
||||
启动`WordCountApp`的main方法即可运行,采用本地模式Storm会自动在本地搭建一个集群,所以启动的过程会稍慢一点,启动成功后即可看到输出日志。
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-word-count-console.png"/> </div>
|
||||
|
||||
@ -394,7 +392,7 @@ public class LocalWordCountApp {
|
||||
|
||||
提交到服务器的代码和本地代码略有不同,提交到服务器集群时需要使用`StormSubmitter`进行提交。主要代码如下。
|
||||
|
||||
> 为了结构清晰,这里新建ClusterWordCountApp类来演示集群提交。实际开发中可以将两种模式的代码写在同一个类中,通过外部传参来决定启动何种模式。
|
||||
> 为了结构清晰,这里新建ClusterWordCountApp类来演示集群模式的提交。实际开发中可以将两种模式的代码写在同一个类中,通过外部传参来决定启动何种模式。
|
||||
|
||||
```java
|
||||
public class ClusterWordCountApp {
|
||||
@ -434,11 +432,11 @@ public class ClusterWordCountApp {
|
||||
使用以下命令提交Topology到集群:
|
||||
|
||||
```shell
|
||||
# 命令格式:storm jar jar包位置 主类的全路径 ...可选传参
|
||||
# 命令格式: storm jar jar包位置 主类的全路径 ...可选传参
|
||||
storm jar /usr/appjar/storm-word-count-1.0.jar com.heibaiying.wordcount.ClusterWordCountApp
|
||||
```
|
||||
|
||||
出现`successfully`则代表提交成功
|
||||
出现`successfully`则代表提交成功:
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-submit-success.png"/> </div>
|
||||
|
||||
@ -456,7 +454,7 @@ storm kill ClusterWordCountApp -w 3
|
||||
|
||||
### 6.5 查看Topology与停止Topology(界面方式)
|
||||
|
||||
使用UI界面同样也可进行同样的操作,进入WEB UI界面(8080端口),在`Topology Summary`中点击对应Topology 即可进入详情页面进行操作。
|
||||
使用UI界面同样也可进行停止操作,进入WEB UI界面(8080端口),在`Topology Summary`中点击对应Topology 即可进入详情页面进行操作。
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-ui-actions.png"/> </div>
|
||||
|
||||
@ -472,7 +470,7 @@ storm kill ClusterWordCountApp -w 3
|
||||
|
||||
### mvn package的局限性
|
||||
|
||||
在上面的步骤中,我们没有在POM中配置任何插件,直接使用`mvn package`进行项目打包,这对于没有使用外部依赖包的项目是可行的。但如果项目中使用了第三方JAR包,就会出现问题,因为`package`打包后的JAR中是不含有依赖包的,如果此时你提交到服务器上运行,就会出现找不到第三方依赖的异常。
|
||||
在上面的步骤中,我们没有在POM中配置任何插件,就直接使用`mvn package`进行项目打包,这对于没有使用外部依赖包的项目是可行的。但如果项目中使用了第三方JAR包,就会出现问题,因为`package`打包后的JAR中是不含有依赖包的,如果此时你提交到服务器上运行,就会出现找不到第三方依赖的异常。
|
||||
|
||||
这时候可能大家会有疑惑,在我们的项目中不是使用了`storm-core`这个依赖吗?其实上面之所以我们能运行成功,是因为在Storm的集群环境中提供了这个JAR包,在安装目录的lib目录下:
|
||||
|
||||
@ -501,7 +499,7 @@ private String productData() {
|
||||
}
|
||||
```
|
||||
|
||||
此时直接使用`mvn clean package`打包上传到服务器运行,就会抛出下图异常。所以在此说明一下:这种直接打包的方式并不适用于实际的开发,因为实际开发中通常都是需要第三方的JAR包的。
|
||||
此时直接使用`mvn clean package`打包运行,就会抛出下图异常的。因此这种直接打包的方式并不适用于实际的开发,因为实际开发中通常都是需要第三方的JAR包。
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-package-error.png"/> </div>
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user