storm编程模型详解

This commit is contained in:
罗祥 2019-04-16 10:57:11 +08:00
parent 4883e0a564
commit 4cffbfa41d
8 changed files with 137 additions and 14 deletions

View File

@ -6,7 +6,7 @@
<groupId>com.heibaiying</groupId>
<artifactId>storm-word-count</artifactId>
<version>1.0-SNAPSHOT</version>
<version>1.0</version>
<build>
<plugins>
<plugin>

View File

@ -0,0 +1,32 @@
package com.heibaiying.wordcount;
import com.heibaiying.wordcount.component.CountBolt;
import com.heibaiying.wordcount.component.DataSourceSpout;
import com.heibaiying.wordcount.component.SplitBolt;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
public class ClusterWordCountApp {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
// 指明将 DataSourceSpout 的数据发送到 SplitBolt 中处理
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
// 指明将 SplitBolt 的数据发送到 CountBolt 处理
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
// 使用StormSubmitter提交Topology到服务器集群
try {
StormSubmitter.submitTopology("ClusterWordCountApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
}
}

View File

@ -7,7 +7,7 @@ import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
public class WordCountApp{
public class LocalWordCountApp {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
@ -19,7 +19,7 @@ public class WordCountApp{
// 创建本地集群用于测试 这种模式不需要本机安装storm,直接运行该Main方法即可
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountTopology",
cluster.submitTopology("LocalWordCountApp",
new Config(), builder.createTopology());
}

View File

@ -10,10 +10,12 @@
&nbsp;&nbsp;&nbsp;&nbsp;<a href="#41-IBolt-接口">4.1 IBolt 接口</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;<a href="#42-BaseRichBolt抽象类">4.2 BaseRichBolt抽象类</a><br/>
<a href="#五词频统计案例">五、词频统计案例</a><br/>
<a href="#六提交到服务器运行">六、提交到服务器运行</a><br/>
<a href="#六提交到服务器集群运行">六、提交到服务器集群运行</a><br/>
</nav>
## 一、简介
下图为Strom的运行流程图也是storm的编程模型图在storm 进行流处理时我们需要自定义实现自己的spout数据源和bolt处理单元并通过`TopologyBuilder`将它们之间进行关联,定义好数据处理的流程。
@ -218,7 +220,19 @@ public interface IRichBolt extends IBolt, IComponent {
### 5.2 代码实现
#### 1.DataSourceSpout
#### 1. 项目依赖
```xml
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
</dependency>
```
#### 2. DataSourceSpout
```java
public class DataSourceSpout extends BaseRichSpout {
@ -273,7 +287,7 @@ HBase Hive
Hadoop Spark HBase Storm
```
#### 2. SplitBolt
#### 3. SplitBolt
```java
public class SplitBolt extends BaseRichBolt {
@ -301,7 +315,7 @@ public class SplitBolt extends BaseRichBolt {
}
```
#### 3. CountBolt
#### 4. CountBolt
```java
public class CountBolt extends BaseRichBolt {
@ -335,15 +349,14 @@ public class CountBolt extends BaseRichBolt {
}
```
#### 4. WordCountApp
#### 5. LocalWordCountApp
通过TopologyBuilder将上面定义好的组件进行串联形成 Topology并提交到本地集群LocalCluster运行。在通常开发中可用本地集群进行测试完成后再提交到服务器集群运行。
```java
public class WordCountApp{
public class LocalWordCountApp {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
@ -356,7 +369,7 @@ public class WordCountApp{
// 创建本地集群用于测试 这种模式不需要本机安装storm,直接运行该Main方法即可
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountTopology",
cluster.submitTopology("LocalWordCountApp",
new Config(), builder.createTopology());
}
@ -365,7 +378,7 @@ public class WordCountApp{
#### 5. 运行结果
#### 6. 运行结果
启动`WordCountApp`的main方法即可运行采用本地模式storm会自动在本地搭建一个集群所以启动的过程会稍慢一点启动成功后即可看到输出日志。
@ -373,5 +386,83 @@ public class WordCountApp{
## 六、提交到服务器运行
## 六、提交到服务器集群运行
#### 6.1 代码更改
提交到服务器的代码和本地代码略有不同,提交到服务器集群时需要使用`StormSubmitter`进行提交。主要代码如下。
> 为了结构清晰这里新建ClusterWordCountApp类来演示集群提交。实际开发中可以将两种模式的代码写在同一个类中通过外部传参来决定启动何种模式。
```java
public class ClusterWordCountApp {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
// 指明将 DataSourceSpout 的数据发送到 SplitBolt 中处理
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
// 指明将 SplitBolt 的数据发送到 CountBolt 中 处理
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
// 使用StormSubmitter提交Topology到服务器集群
try {
StormSubmitter.submitTopology("ClusterWordCountApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
}
}
```
#### 6.2 打包上传
打包后上传到服务器任意位置,这里我打包后的名称为`storm-word-count-1.0.jar`
```shell
# mvn clean package -DskipTests=true
```
#### 6.3 提交Topology
使用以下命令提交Topology到集群
```shell
# 命令格式storm jar jar包位置 主类的全路径 ...可选传参
storm jar /usr/appjar/storm-word-count-1.0.jar com.heibaiying.wordcount.ClusterWordCountApp
```
出现`successfully`则代表提交成功
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-submit-success.png"/> </div>
#### 6.4 查看Topology与停止Topology命令行方式
```shell
# 查看所有Topology
storm list
# 停止 storm kill topology-name [-w wait-time-secs]
storm kill ClusterWordCountApp -w 3
```
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-list-kill.png"/> </div>
#### 6.5 查看Topology与停止Topology界面方式
使用UI界面同样也可进行同样的操作进入WEB UI界面8080端口`Topology Summary`中点击对应Topology 即可进入详情页面进行操作。
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-ui-actions.png"/> </div>

View File

@ -58,7 +58,7 @@ export PATH=$STORM_HOME/bin:$PATH
### 4.启动相关进程
因为要启动多个进程,所以这里我们统一用后台进程的方式启动
因为要启动多个进程,所以统一用后台进程的方式启动,采用后台启动时需要进入到`$STORM_HOME/bin`目录下,命令如下:
```shell
# 启动zookeeper

Binary file not shown.

After

Width:  |  Height:  |  Size: 49 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 59 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB