storm编程模型

This commit is contained in:
罗祥 2019-04-15 16:03:12 +08:00
parent 84e5ba54ec
commit 2ddc8b8669
12 changed files with 383 additions and 9 deletions

View File

@ -0,0 +1,4 @@
worker-id: 7b8e6dbf-1e3e-4368-8f0c-1a4936042ca7
logs.users: []
logs.groups: []
topology.submitter.user: ciic

View File

@ -0,0 +1,4 @@
worker-id: 931219fd-8b9a-4333-9fda-5d1df11a258c
logs.users: []
logs.groups: []
topology.submitter.user: ciic

View File

@ -0,0 +1,4 @@
worker-id: 9cdf2e0f-b135-41c6-b3fd-1502afddf212
logs.users: []
logs.groups: []
topology.submitter.user: ciic

View File

@ -0,0 +1,4 @@
worker-id: 9837751b-8320-4651-b325-3c64898b976d
logs.users: []
logs.groups: []
topology.submitter.user: ciic

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.heibaiying</groupId>
<artifactId>storm-word-count</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,26 @@
package com.heibaiying.wordcount;
import com.heibaiying.wordcount.component.CountBolt;
import com.heibaiying.wordcount.component.DataSourceSpout;
import com.heibaiying.wordcount.component.SplitBolt;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
public class WordCountApp{
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
// 指明将 DataSourceSpout 的数据发送到 SplitBolt 中处理
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
// 指明将 SplitBolt 的数据发送到 CountBolt 处理
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
// 创建本地集群用于测试 这种模式不需要本机安装storm,直接运行该Main方法即可
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountTopology",
new Config(), builder.createTopology());
}
}

View File

@ -0,0 +1,40 @@
package com.heibaiying.wordcount.component;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
public class CountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
// 输出
System.out.print("当前实时统计结果:");
counts.forEach((key, value) -> System.out.print(key + ":" + value + "; "));
System.out.println();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}

View File

@ -0,0 +1,49 @@
package com.heibaiying.wordcount.component;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.*;
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
// 模拟产生数据
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
/**
* 模拟数据
*/
private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}
}

View File

@ -0,0 +1,35 @@
package com.heibaiying.wordcount.component;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split("\t");
for (String word : words) {
collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

View File

@ -1,14 +1,28 @@
# Storm 编程模型
<nav>
<a href="#一简介">一、简介</a><br/>
<a href="#二IComponent接口">二、IComponent接口</a><br/>
<a href="#三Spout">三、Spout</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;<a href="#31-ISpout接口">3.1 ISpout接口</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;<a href="#32-BaseRichSpout抽象类">3.2 BaseRichSpout抽象类</a><br/>
<a href="#四Bolt">四、Bolt</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;<a href="#41-IBolt-接口">4.1 IBolt 接口</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;<a href="#42-BaseRichBolt抽象类">4.2 BaseRichBolt抽象类</a><br/>
<a href="#五词频统计案例">五、词频统计案例</a><br/>
<a href="#六提交到服务器运行">六、提交到服务器运行</a><br/>
</nav>
## 一、简介
下图为Strom的运行流程图也是storm的编程模型图在storm 进行流处理时我们需要自定义实现自己的spout数据源和bolt处理单元并通过`TopologyBuilder`将它们之间进行关联,定义好数据处理的流程。
下面小结分别介绍如何按照storm内置接口分别实现spout和bolt然后将其进行关联最后将其提交到本地和服务器进行运行。
![spout-bolt](D:\BigData-Notes\pictures\spout-bolt.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spout-bolt.png"/> </div>
## 二、IComponent
## 二、IComponent接口
`IComponent`接口定义了Topology中所有组件spout/bolt的公共方法我们实现spout或bolt都必须直接或者间接实现这个接口。
@ -30,7 +44,7 @@ public interface IComponent extends Serializable {
}
```
## 三、spout
## 三、Spout
### 3.1 ISpout接口
@ -84,7 +98,7 @@ public interface ISpout extends Serializable {
**通常情况下我们实现自定义的Spout时不会直接去实现`ISpout`接口,而是继承`BaseRichSpout`。**`BaseRichSpout`继承自`BaseCompont`,同时实现了`IRichSpout`接口。
![storm-baseRichSpout](D:\BigData-Notes\pictures\storm-baseRichSpout.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-baseRichSpout.png"/> </div>
`IRichSpout`接口继承自`ISpout``IComponent`,自身并没有定义任何方法。
@ -134,7 +148,7 @@ public abstract class BaseRichSpout extends BaseComponent implements IRichSpout
## 四、bolt
## 四、Bolt
通过上小结我们已经了解了storm如何对spout接口进行设计的bolt接口的设计也是一样的。
@ -175,7 +189,7 @@ public interface IBolt extends Serializable {
同样的在实现我们自己的bolt时我们也通常是继承`BaseRichBolt`抽象类来实现。`BaseRichBolt`继承自`BaseComponent`抽象类,并实现了`IRichBolt`接口。
![storm-baseRichbolt](D:\BigData-Notes\pictures\storm-baseRichbolt.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-baseRichbolt.png"/> </div>
`IRichBolt`接口继承自`IBolt``IComponent`,自身并没有定义任何方法。
@ -187,14 +201,176 @@ public interface IRichBolt extends IBolt, IComponent {
通过这样的设计,我们在继承`BaseRichBolt`实现自己的bolt时就只需要实现三个必须的方法
- prepare 来源于IBolt可以通过此方法获取用来发送tuples的`SpoutOutputCollector`
- prepare 来源于IBolt可以通过此方法获取用来发送tuples的`OutputCollector`
- execute来源于IBolt处理tuple和发送处理完成的tuple
- declareOutputFields 来源于IComponent通过此方法声明发送的tuple的名称这样下一个组件才能知道如何接受数据。
## 五、使用案例
## 五、词频统计案例
### 5.1 案例简介
使用模拟数据进行词频统计。这里我们使用自定义的DataSourceSpout产生模拟数据。实际生产环境中通常是通过日志收集引擎如Flume、logstash等将收集到的数据发送到kafka指定的topic通过storm内置的`KafkaSpout `来监听该topic并获取数据。
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-word-count-p.png"/> </div>
### 5.2 代码实现
#### 1.DataSourceSpout
```java
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
// 模拟产生数据
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
/**
* 模拟数据
*/
private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}
}
```
上面类使用`productData`方法来产生模拟数据,产生数据的格式如下:
```properties
Spark HBase
Hive Flink Storm Hadoop HBase Spark
Flink
HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
Hadoop Spark HBase Storm
```
#### 2. SplitBolt
```java
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split("\t");
for (String word : words) {
collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
```
#### 3. CountBolt
```java
public class CountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
// 输出
System.out.print("当前实时统计结果:");
counts.forEach((key, value) -> System.out.print(key + ":" + value + "; "));
System.out.println();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
```
#### 4. WordCountApp
通过TopologyBuilder将上面定义好的组件进行串联形成 Topology并提交到本地集群LocalCluster运行。在通常开发中可用本地集群进行测试完成后再提交到服务器集群运行。
```java
public class WordCountApp{
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
// 指明将 DataSourceSpout 的数据发送到 SplitBolt 中处理
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
// 指明将 SplitBolt 的数据发送到 CountBolt 中 处理
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
// 创建本地集群用于测试 这种模式不需要本机安装storm,直接运行该Main方法即可
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountTopology",
new Config(), builder.createTopology());
}
}
```
## 六、提交到服务器运行
#### 5. 运行结果
启动`WordCountApp`的main方法即可运行采用本地模式storm会自动在本地搭建一个集群所以启动的过程会稍慢一点启动成功后即可看到输出日志。
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-word-count-console.png"/> </div>
## 六、提交到服务器运行

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB