diff --git a/code/Storm/storm-word-count/pom.xml b/code/Storm/storm-word-count/pom.xml index da880b7..0a39154 100644 --- a/code/Storm/storm-word-count/pom.xml +++ b/code/Storm/storm-word-count/pom.xml @@ -6,7 +6,7 @@ com.heibaiying storm-word-count - 1.0-SNAPSHOT + 1.0 diff --git a/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/ClusterWordCountApp.java b/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/ClusterWordCountApp.java new file mode 100644 index 0000000..9497e95 --- /dev/null +++ b/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/ClusterWordCountApp.java @@ -0,0 +1,32 @@ +package com.heibaiying.wordcount; + +import com.heibaiying.wordcount.component.CountBolt; +import com.heibaiying.wordcount.component.DataSourceSpout; +import com.heibaiying.wordcount.component.SplitBolt; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; + +public class ClusterWordCountApp { + + public static void main(String[] args) { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("DataSourceSpout", new DataSourceSpout()); + // 指明将 DataSourceSpout 的数据发送到 SplitBolt 中处理 + builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout"); + // 指明将 SplitBolt 的数据发送到 CountBolt 中 处理 + builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt"); + + // 使用StormSubmitter提交Topology到服务器集群 + try { + StormSubmitter.submitTopology("ClusterWordCountApp", new Config(), builder.createTopology()); + } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { + e.printStackTrace(); + } + } + +} diff --git a/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/WordCountApp.java b/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/LocalWordCountApp.java similarity index 92% rename from code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/WordCountApp.java rename to code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/LocalWordCountApp.java index e46d7a3..f2597a7 100644 --- a/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/WordCountApp.java +++ b/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/LocalWordCountApp.java @@ -7,7 +7,7 @@ import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; -public class WordCountApp{ +public class LocalWordCountApp { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); @@ -19,7 +19,7 @@ public class WordCountApp{ // 创建本地集群用于测试 这种模式不需要本机安装storm,直接运行该Main方法即可 LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("LocalWordCountTopology", + cluster.submitTopology("LocalWordCountApp", new Config(), builder.createTopology()); } diff --git a/notes/Storm编程模型详解.md b/notes/Storm编程模型详解.md index 7c3805b..79b0c72 100644 --- a/notes/Storm编程模型详解.md +++ b/notes/Storm编程模型详解.md @@ -10,10 +10,12 @@     4.1 IBolt 接口
    4.2 BaseRichBolt抽象类
五、词频统计案例
-六、提交到服务器运行
+六、提交到服务器集群运行
+ + ## 一、简介 下图为Strom的运行流程图,也是storm的编程模型图,在storm 进行流处理时,我们需要自定义实现自己的spout(数据源)和bolt(处理单元),并通过`TopologyBuilder`将它们之间进行关联,定义好数据处理的流程。 @@ -218,7 +220,19 @@ public interface IRichBolt extends IBolt, IComponent { ### 5.2 代码实现 -#### 1.DataSourceSpout +#### 1. 项目依赖 + +```xml + + org.apache.storm + storm-core + 1.2.2 + +``` + + + +#### 2. DataSourceSpout ```java public class DataSourceSpout extends BaseRichSpout { @@ -273,7 +287,7 @@ HBase Hive Hadoop Spark HBase Storm ``` -#### 2. SplitBolt +#### 3. SplitBolt ```java public class SplitBolt extends BaseRichBolt { @@ -301,7 +315,7 @@ public class SplitBolt extends BaseRichBolt { } ``` -#### 3. CountBolt +#### 4. CountBolt ```java public class CountBolt extends BaseRichBolt { @@ -335,15 +349,14 @@ public class CountBolt extends BaseRichBolt { } ``` -#### 4. WordCountApp +#### 5. LocalWordCountApp 通过TopologyBuilder将上面定义好的组件进行串联形成 Topology,并提交到本地集群(LocalCluster)运行。在通常开发中,可用本地集群进行,测试完成后再提交到服务器集群运行。 ```java -public class WordCountApp{ +public class LocalWordCountApp { public static void main(String[] args) { - TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DataSourceSpout", new DataSourceSpout()); @@ -356,7 +369,7 @@ public class WordCountApp{ // 创建本地集群用于测试 这种模式不需要本机安装storm,直接运行该Main方法即可 LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("LocalWordCountTopology", + cluster.submitTopology("LocalWordCountApp", new Config(), builder.createTopology()); } @@ -365,7 +378,7 @@ public class WordCountApp{ -#### 5. 运行结果 +#### 6. 运行结果 启动`WordCountApp`的main方法即可运行,采用本地模式storm会自动在本地搭建一个集群,所以启动的过程会稍慢一点,启动成功后即可看到输出日志。 @@ -373,5 +386,83 @@ public class WordCountApp{ -## 六、提交到服务器运行 +## 六、提交到服务器集群运行 + +#### 6.1 代码更改 + +提交到服务器的代码和本地代码略有不同,提交到服务器集群时需要使用`StormSubmitter`进行提交。主要代码如下。 + +> 为了结构清晰,这里新建ClusterWordCountApp类来演示集群提交。实际开发中可以将两种模式的代码写在同一个类中,通过外部传参来决定启动何种模式。 + +```java +public class ClusterWordCountApp { + + public static void main(String[] args) { + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("DataSourceSpout", new DataSourceSpout()); + + // 指明将 DataSourceSpout 的数据发送到 SplitBolt 中处理 + builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout"); + + // 指明将 SplitBolt 的数据发送到 CountBolt 中 处理 + builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt"); + + // 使用StormSubmitter提交Topology到服务器集群 + try { + StormSubmitter.submitTopology("ClusterWordCountApp", new Config(), builder.createTopology()); + } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { + e.printStackTrace(); + } + } + +} +``` + +#### 6.2 打包上传 + +打包后上传到服务器任意位置,这里我打包后的名称为`storm-word-count-1.0.jar` + +```shell +# mvn clean package -DskipTests=true +``` + +#### 6.3 提交Topology + +使用以下命令提交Topology到集群: + +```shell +# 命令格式:storm jar jar包位置 主类的全路径 ...可选传参 +storm jar /usr/appjar/storm-word-count-1.0.jar com.heibaiying.wordcount.ClusterWordCountApp +``` + +出现`successfully`则代表提交成功 + +
+ +#### 6.4 查看Topology与停止Topology(命令行方式) + +```shell +# 查看所有Topology +storm list + +# 停止 storm kill topology-name [-w wait-time-secs] +storm kill ClusterWordCountApp -w 3 +``` + +
+ +#### 6.5 查看Topology与停止Topology(界面方式) + +使用UI界面同样也可进行同样的操作,进入WEB UI界面(8080端口),在`Topology Summary`中点击对应Topology 即可进入详情页面进行操作。 + +
+ + + + + + + + diff --git a/notes/installation/Storm单机版本环境搭建.md b/notes/installation/Storm单机版本环境搭建.md index a1e78b3..33fd993 100644 --- a/notes/installation/Storm单机版本环境搭建.md +++ b/notes/installation/Storm单机版本环境搭建.md @@ -58,7 +58,7 @@ export PATH=$STORM_HOME/bin:$PATH ### 4.启动相关进程 -因为要启动多个进程,所以这里我们统一用后台进程的方式启动 +因为要启动多个进程,所以统一采用后台进程的方式启动,采用后台启动时需要进入到`$STORM_HOME/bin`目录下,命令如下: ```shell # 启动zookeeper diff --git a/pictures/storm-list-kill.png b/pictures/storm-list-kill.png new file mode 100644 index 0000000..2c5deb5 Binary files /dev/null and b/pictures/storm-list-kill.png differ diff --git a/pictures/storm-submit-success.png b/pictures/storm-submit-success.png new file mode 100644 index 0000000..07604c4 Binary files /dev/null and b/pictures/storm-submit-success.png differ diff --git a/pictures/storm-ui-actions.png b/pictures/storm-ui-actions.png new file mode 100644 index 0000000..1f05fc1 Binary files /dev/null and b/pictures/storm-ui-actions.png differ