diff --git a/code/Storm/storm-word-count/logs/workers-artifacts/LocalWordCountTopology-1-1555310515/1027/worker.yaml b/code/Storm/storm-word-count/logs/workers-artifacts/LocalWordCountTopology-1-1555310515/1027/worker.yaml new file mode 100644 index 0000000..ecad21e --- /dev/null +++ b/code/Storm/storm-word-count/logs/workers-artifacts/LocalWordCountTopology-1-1555310515/1027/worker.yaml @@ -0,0 +1,4 @@ +worker-id: 7b8e6dbf-1e3e-4368-8f0c-1a4936042ca7 +logs.users: [] +logs.groups: [] +topology.submitter.user: ciic diff --git a/code/Storm/storm-word-count/logs/workers-artifacts/LocalWordCountTopology-1-1555311430/1027/worker.yaml b/code/Storm/storm-word-count/logs/workers-artifacts/LocalWordCountTopology-1-1555311430/1027/worker.yaml new file mode 100644 index 0000000..a3d2249 --- /dev/null +++ b/code/Storm/storm-word-count/logs/workers-artifacts/LocalWordCountTopology-1-1555311430/1027/worker.yaml @@ -0,0 +1,4 @@ +worker-id: 931219fd-8b9a-4333-9fda-5d1df11a258c +logs.users: [] +logs.groups: [] +topology.submitter.user: ciic diff --git a/code/Storm/storm-word-count/logs/workers-artifacts/LocalWordCountTopology-1-1555311784/1024/worker.yaml b/code/Storm/storm-word-count/logs/workers-artifacts/LocalWordCountTopology-1-1555311784/1024/worker.yaml new file mode 100644 index 0000000..e627b4a --- /dev/null +++ b/code/Storm/storm-word-count/logs/workers-artifacts/LocalWordCountTopology-1-1555311784/1024/worker.yaml @@ -0,0 +1,4 @@ +worker-id: 9cdf2e0f-b135-41c6-b3fd-1502afddf212 +logs.users: [] +logs.groups: [] +topology.submitter.user: ciic diff --git a/code/Storm/storm-word-count/logs/workers-artifacts/LocalWordCountTopology-1-1555313003/1027/worker.yaml b/code/Storm/storm-word-count/logs/workers-artifacts/LocalWordCountTopology-1-1555313003/1027/worker.yaml new file mode 100644 index 0000000..677f718 --- /dev/null +++ b/code/Storm/storm-word-count/logs/workers-artifacts/LocalWordCountTopology-1-1555313003/1027/worker.yaml @@ -0,0 +1,4 @@ +worker-id: 9837751b-8320-4651-b325-3c64898b976d +logs.users: [] +logs.groups: [] +topology.submitter.user: ciic diff --git a/code/Storm/storm-word-count/pom.xml b/code/Storm/storm-word-count/pom.xml new file mode 100644 index 0000000..da880b7 --- /dev/null +++ b/code/Storm/storm-word-count/pom.xml @@ -0,0 +1,32 @@ + + + 4.0.0 + + com.heibaiying + storm-word-count + 1.0-SNAPSHOT + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + + + + org.apache.storm + storm-core + 1.2.2 + + + + \ No newline at end of file diff --git a/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/WordCountApp.java b/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/WordCountApp.java new file mode 100644 index 0000000..e46d7a3 --- /dev/null +++ b/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/WordCountApp.java @@ -0,0 +1,26 @@ +package com.heibaiying.wordcount; + +import com.heibaiying.wordcount.component.CountBolt; +import com.heibaiying.wordcount.component.DataSourceSpout; +import com.heibaiying.wordcount.component.SplitBolt; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.topology.TopologyBuilder; + +public class WordCountApp{ + + public static void main(String[] args) { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("DataSourceSpout", new DataSourceSpout()); + // 指明将 DataSourceSpout 的数据发送到 SplitBolt 中处理 + builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout"); + // 指明将 SplitBolt 的数据发送到 CountBolt 中 处理 + builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt"); + + // 创建本地集群用于测试 这种模式不需要本机安装storm,直接运行该Main方法即可 + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("LocalWordCountTopology", + new Config(), builder.createTopology()); + } + +} diff --git a/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/CountBolt.java b/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/CountBolt.java new file mode 100644 index 0000000..5fd3170 --- /dev/null +++ b/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/CountBolt.java @@ -0,0 +1,40 @@ +package com.heibaiying.wordcount.component; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.HashMap; +import java.util.Map; + +public class CountBolt extends BaseRichBolt { + + private Map counts = new HashMap<>(); + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + + } + + @Override + public void execute(Tuple input) { + String word = input.getStringByField("word"); + Integer count = counts.get(word); + if (count == null) { + count = 0; + } + count++; + counts.put(word, count); + // 输出 + System.out.print("当前实时统计结果:"); + counts.forEach((key, value) -> System.out.print(key + ":" + value + "; ")); + System.out.println(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} diff --git a/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/DataSourceSpout.java b/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/DataSourceSpout.java new file mode 100644 index 0000000..8ca0d78 --- /dev/null +++ b/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/DataSourceSpout.java @@ -0,0 +1,49 @@ +package com.heibaiying.wordcount.component; + +import org.apache.storm.shade.org.apache.commons.lang.StringUtils; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +import java.util.*; + +public class DataSourceSpout extends BaseRichSpout { + + private List list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); + + private SpoutOutputCollector spoutOutputCollector; + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + this.spoutOutputCollector = spoutOutputCollector; + } + + @Override + public void nextTuple() { + // 模拟产生数据 + String lineData = productData(); + spoutOutputCollector.emit(new Values(lineData)); + Utils.sleep(1000); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("line")); + } + + + /** + * 模拟数据 + */ + private String productData() { + Collections.shuffle(list); + Random random = new Random(); + int endIndex = random.nextInt(list.size()) % (list.size()) + 1; + return StringUtils.join(list.toArray(), "\t", 0, endIndex); + } + +} diff --git a/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/SplitBolt.java b/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/SplitBolt.java new file mode 100644 index 0000000..5cf1c2b --- /dev/null +++ b/code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/SplitBolt.java @@ -0,0 +1,35 @@ +package com.heibaiying.wordcount.component; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.Map; + +public class SplitBolt extends BaseRichBolt { + + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector=collector; + } + + @Override + public void execute(Tuple input) { + String line = input.getStringByField("line"); + String[] words = line.split("\t"); + for (String word : words) { + collector.emit(new Values(word)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } +} diff --git a/notes/storm编程模型.md b/notes/storm编程模型.md index 21eca93..7b88c29 100644 --- a/notes/storm编程模型.md +++ b/notes/storm编程模型.md @@ -1,14 +1,28 @@ # Storm 编程模型 + + + ## 一、简介 下图为Strom的运行流程图,也是storm的编程模型图,在storm 进行流处理时,我们需要自定义实现自己的spout(数据源)和bolt(处理单元),并通过`TopologyBuilder`将它们之间进行关联,定义好数据处理的流程。 下面小结分别介绍如何按照storm内置接口分别实现spout和bolt,然后将其进行关联,最后将其提交到本地和服务器进行运行。 -![spout-bolt](D:\BigData-Notes\pictures\spout-bolt.png) +
-## 二、IComponent +## 二、IComponent接口 `IComponent`接口定义了Topology中所有组件(spout/bolt)的公共方法,我们实现spout或bolt都必须直接或者间接实现这个接口。 @@ -30,7 +44,7 @@ public interface IComponent extends Serializable { } ``` -## 三、spout +## 三、Spout ### 3.1 ISpout接口 @@ -84,7 +98,7 @@ public interface ISpout extends Serializable { **通常情况下,我们实现自定义的Spout时不会直接去实现`ISpout`接口,而是继承`BaseRichSpout`。**`BaseRichSpout`继承自`BaseCompont`,同时实现了`IRichSpout`接口。 -![storm-baseRichSpout](D:\BigData-Notes\pictures\storm-baseRichSpout.png) +
`IRichSpout`接口继承自`ISpout`和`IComponent`,自身并没有定义任何方法。 @@ -134,7 +148,7 @@ public abstract class BaseRichSpout extends BaseComponent implements IRichSpout -## 四、bolt +## 四、Bolt 通过上小结我们已经了解了storm如何对spout接口进行设计的,bolt接口的设计也是一样的。 @@ -175,7 +189,7 @@ public interface IBolt extends Serializable { 同样的,在实现我们自己的bolt时,我们也通常是继承`BaseRichBolt`抽象类来实现。`BaseRichBolt`继承自`BaseComponent`抽象类,并实现了`IRichBolt`接口。 -![storm-baseRichbolt](D:\BigData-Notes\pictures\storm-baseRichbolt.png) +
`IRichBolt`接口继承自`IBolt`和`IComponent`,自身并没有定义任何方法。 @@ -187,14 +201,176 @@ public interface IRichBolt extends IBolt, IComponent { 通过这样的设计,我们在继承`BaseRichBolt`实现自己的bolt时,就只需要实现三个必须的方法: -- prepare: 来源于IBolt,可以通过此方法获取用来发送tuples的`SpoutOutputCollector`; +- prepare: 来源于IBolt,可以通过此方法获取用来发送tuples的`OutputCollector`; - execute:来源于IBolt,处理tuple和发送处理完成的tuple; - declareOutputFields :来源于IComponent,通过此方法声明发送的tuple的名称,这样下一个组件才能知道如何接受数据。 -## 五、使用案例 +## 五、词频统计案例 + +### 5.1 案例简介 + +使用模拟数据进行词频统计。这里我们使用自定义的DataSourceSpout产生模拟数据。实际生产环境中通常是通过日志收集引擎(如Flume、logstash等)将收集到的数据发送到kafka指定的topic,通过storm内置的`KafkaSpout `来监听该topic,并获取数据。 + +
+ +### 5.2 代码实现 + +#### 1.DataSourceSpout + +```java +public class DataSourceSpout extends BaseRichSpout { + + private List list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); + + private SpoutOutputCollector spoutOutputCollector; + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + this.spoutOutputCollector = spoutOutputCollector; + } + + @Override + public void nextTuple() { + // 模拟产生数据 + String lineData = productData(); + spoutOutputCollector.emit(new Values(lineData)); + Utils.sleep(1000); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("line")); + } + + + /** + * 模拟数据 + */ + private String productData() { + Collections.shuffle(list); + Random random = new Random(); + int endIndex = random.nextInt(list.size()) % (list.size()) + 1; + return StringUtils.join(list.toArray(), "\t", 0, endIndex); + } + +} +``` + +上面类使用`productData`方法来产生模拟数据,产生数据的格式如下: + +```properties +Spark HBase +Hive Flink Storm Hadoop HBase Spark +Flink +HBase Storm +HBase Hadoop Hive Flink +HBase Flink Hive Storm +Hive Flink Hadoop +HBase Hive +Hadoop Spark HBase Storm +``` + +#### 2. SplitBolt + +```java +public class SplitBolt extends BaseRichBolt { + + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector=collector; + } + + @Override + public void execute(Tuple input) { + String line = input.getStringByField("line"); + String[] words = line.split("\t"); + for (String word : words) { + collector.emit(new Values(word)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } +} +``` + +#### 3. CountBolt + +```java +public class CountBolt extends BaseRichBolt { + + private Map counts = new HashMap<>(); + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + + } + + @Override + public void execute(Tuple input) { + String word = input.getStringByField("word"); + Integer count = counts.get(word); + if (count == null) { + count = 0; + } + count++; + counts.put(word, count); + // 输出 + System.out.print("当前实时统计结果:"); + counts.forEach((key, value) -> System.out.print(key + ":" + value + "; ")); + System.out.println(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} +``` + +#### 4. WordCountApp + +通过TopologyBuilder将上面定义好的组件进行串联形成 Topology,并提交到本地集群(LocalCluster)运行。在通常开发中,可用本地集群进行,测试完成后再提交到服务器集群运行。 + +```java +public class WordCountApp{ + + public static void main(String[] args) { + + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("DataSourceSpout", new DataSourceSpout()); + + // 指明将 DataSourceSpout 的数据发送到 SplitBolt 中处理 + builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout"); + + // 指明将 SplitBolt 的数据发送到 CountBolt 中 处理 + builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt"); + + // 创建本地集群用于测试 这种模式不需要本机安装storm,直接运行该Main方法即可 + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("LocalWordCountTopology", + new Config(), builder.createTopology()); + } + +} +``` -## 六、提交到服务器运行 \ No newline at end of file +#### 5. 运行结果 + +启动`WordCountApp`的main方法即可运行,采用本地模式storm会自动在本地搭建一个集群,所以启动的过程会稍慢一点,启动成功后即可看到输出日志。 + +
+ + + +## 六、提交到服务器运行 + diff --git a/pictures/storm-word-count-console.png b/pictures/storm-word-count-console.png new file mode 100644 index 0000000..597110c Binary files /dev/null and b/pictures/storm-word-count-console.png differ diff --git a/pictures/storm-word-count-p.png b/pictures/storm-word-count-p.png new file mode 100644 index 0000000..8b55d17 Binary files /dev/null and b/pictures/storm-word-count-p.png differ