BigData-Notes/notes/storm编程模型.md
2019-04-14 18:25:51 +08:00

6.6 KiB
Raw Blame History

Storm 编程模型

一、简介

下图为Strom的运行流程图也是storm的编程模型图在storm 进行流处理时我们需要自定义实现自己的spout数据源和bolt处理单元并通过TopologyBuilder将它们之间进行关联,定义好数据处理的流程。

下面小结分别介绍如何按照storm内置接口分别实现spout和bolt然后将其进行关联最后将其提交到本地和服务器进行运行。

spout-bolt

二、IComponent

IComponent接口定义了Topology中所有组件spout/bolt的公共方法我们实现spout或bolt都必须直接或者间接实现这个接口。

public interface IComponent extends Serializable {

    /**
     * 声明此拓扑的所有流的输出模式。
     * @param declarer这用于声明输出流id输出字段以及每个输出流是否是直接流direct stream
     */
    void declareOutputFields(OutputFieldsDeclarer declarer);

    /**
     * 声明此组件的配置。
     *
     */
    Map<String, Object> getComponentConfiguration();

}

三、spout

3.1 ISpout接口

实现自定义的spout需要实现ISpout其定义了spout的所有可用方法

public interface ISpout extends Serializable {
    /**
     * 组件初始化时候被调用
     *
     * @param conf ISpout的配置
     * @param context 应用上下文可以通过其获取任务ID和组件ID输入和输出信息等。
     * @param collector  用来发送spout中的tuples它是线程安全的建议保存为此spout对象的实例变量
     */
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

    /**
     * ISpout将要被关闭的时候调用。但是其不一定会被执行如果在集群环境中通过kill -9 杀死进程时其就无法被执行。
     */
    void close();
    
    /**
     * 当ISpout从停用状态激活时被调用
     */
    void activate();
    
    /**
     * 当ISpout停用时候被调用
     */
    void deactivate();

    /**
     * 这是一个核心方法主要通过在此方法中调用collector将tuples发送给下一个接收器这个方法必须是非阻塞的。              * nextTuple/ack/fail/是在同一个线程中执行的所以不用考虑线程安全方面。当没有tuples发出时应该
     * 让nextTuple休眠sleep一下以免浪费CPU。
     */
    void nextTuple();

    /**
     * 通过msgId进行tuples处理成功的确认被确认后的tuples不会再次被发送
     */
    void ack(Object msgId);

    /**
     * 通过msgId进行tuples处理失败的确认被确认后的tuples会再次被发送进行处理
     */
    void fail(Object msgId);
}

3.2 BaseRichSpout抽象类

通常情况下我们实现自定义的Spout时不会直接去实现ISpout接口,而是继承BaseRichSpoutBaseRichSpout继承自BaseCompont,同时实现了IRichSpout接口。

storm-baseRichSpout

IRichSpout接口继承自ISpoutIComponent,自身并没有定义任何方法。

public interface IRichSpout extends ISpout, IComponent {

}

BaseComponent 抽象类也仅仅是空实现了IComponentgetComponentConfiguration方法。

public abstract class BaseComponent implements IComponent {
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }    
}

BaseRichSpout通过继承自BaseCompont,同时实现了IRichSpout接口,并且空实现了其中部分方法。

public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
    @Override
    public void close() {}

    @Override
    public void activate() {}

    @Override
    public void deactivate() {}

    @Override
    public void ack(Object msgId) {}

    @Override
    public void fail(Object msgId) {}
}

通过这样的设计,我们在继承BaseRichSpout实现自己的spout时就只需要实现三个必须的方法

  • open 来源于ISpout可以通过此方法获取用来发送tuples的SpoutOutputCollector
  • nextTuple 来源于ISpout必须在此方法内部才能调用SpoutOutputCollector发送tuple
  • declareOutputFields 来源于IComponent通过此方法声明发送的tuple的名称这样下一个组件才能知道如何接受数据。

四、bolt

通过上小结我们已经了解了storm如何对spout接口进行设计的bolt接口的设计也是一样的。

4.1 IBolt 接口

 /**
  * 在客户端计算机上创建的IBolt对象。会被被序列化到topology中使用Java序列化,并提交给集群的主机Nimbus  * Nimbus启动workers反序列化对象调用prepare然后开始处理tuples。
 */

public interface IBolt extends Serializable {
    /**
     * 组件初始化时候被调用
     *
     * @param conf storm中定义的此bolt的配置
     * @param context 应用上下文可以通过其获取任务ID和组件ID输入和输出信息等。
     * @param collector  用来发送spout中的tuples它是线程安全的建议保存为此spout对象的实例变量
     */
    void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

    /**
     * 处理单个tuple输入。
     * 
     * @param Tuple对象包含关于它的元数据如来自哪个组件/流/任务)
     */
    void execute(Tuple input);

    /**
     * IBolt将要被关闭的时候调用。但是其不一定会被执行如果在集群环境中通过kill -9 杀死进程时其就无法被执行。
     */
    void cleanup();

4.2 BaseRichBolt抽象类

同样的在实现我们自己的bolt时我们也通常是继承BaseRichBolt抽象类来实现。BaseRichBolt继承自BaseComponent抽象类,并实现了IRichBolt接口。

storm-baseRichbolt

IRichBolt接口继承自IBoltIComponent,自身并没有定义任何方法。

public interface IRichBolt extends IBolt, IComponent {

}

通过这样的设计,我们在继承BaseRichBolt实现自己的bolt时就只需要实现三个必须的方法

  • prepare 来源于IBolt可以通过此方法获取用来发送tuples的SpoutOutputCollector
  • execute来源于IBolt处理tuple和发送处理完成的tuple
  • declareOutputFields 来源于IComponent通过此方法声明发送的tuple的名称这样下一个组件才能知道如何接受数据。

五、使用案例

六、提交到服务器运行