BigData-Notes/notes/Storm集成Redis详解.md
2019-04-20 12:36:57 +08:00

655 lines
21 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Storm 集成 Redis 详解
<nav>
<a href="#一简介">一、简介</a><br/>
<a href="#二集成案例">二、集成案例</a><br/>
<a href="#三storm-redis-实现原理">三、storm-redis 实现原理</a><br/>
<a href="#四自定义RedisBolt实现词频统计">四、自定义RedisBolt实现词频统计</a><br/>
</nav>
## 一、简介
storm-redis提供了Storm与Redis的集成支持你只需要引入对应的依赖即可使用。
```xml
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>${storm.version}</version>
<type>jar</type>
</dependency>
```
Storm-redis使用Jedis为Redis客户端并提供了如下三个基本的Bolt实现
+ RedisLookupBolt从Redis中查询数据
+ RedisStoreBolt存储数据到Redis
+ RedisFilterBolt : 查询符合条件的数据;
`RedisLookupBolt``RedisStoreBolt``RedisFilterBolt `均继承自`AbstractRedisBolt`抽象类。我们可以通过继承该抽象类进行功能的拓展实现自定义RedisBolt。
## 二、集成案例
### 2.1 项目结构
这里首先给出一个集成案例进行词频统计并将最后的结果存储到Redis。项目结构如下
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-wordcounttoredis.png"/> </div>
> 用例源码下载地址:[storm-redis-integration](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-redis-integration)
### 2.2 项目依赖
项目主要依赖如下:
```xml
<properties>
<storm.version>1.2.2</storm.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>${storm.version}</version>
</dependency>
</dependencies>
```
### 2.3 DataSourceSpout
```java
/**
* 产生词频样本的数据源
*/
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
// 模拟产生数据
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
/**
* 模拟数据
*/
private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}
}
```
产生的模拟数据格式如下:
```properties
Spark HBase
Hive Flink Storm Hadoop HBase Spark
Flink
HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
Hadoop Spark HBase Storm
```
### 2.4 SplitBolt
```java
/**
* 将每行数据按照指定分隔符进行拆分
*/
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split("\t");
for (String word : words) {
collector.emit(new Values(word, String.valueOf(1)));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
```
### 2.5 CountBolt
```java
/**
* 进行词频统计
*/
public class CountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
// 输出
collector.emit(new Values(word, String.valueOf(count)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
```
### 2.6 WordCountStoreMapper
实现RedisStoreMapper接口并定义tuple与Redis中数据的映射关系Redis中存储的是Key/Value键值对并且支持多种数据结构你需要指定tuple中的哪个字段为key哪个字段为value并且存储到什么数据结构中。
```java
/**
* 定义tuple与Redis中数据的映射关系
*/
public class WordCountStoreMapper implements RedisStoreMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";
public WordCountStoreMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return tuple.getStringByField("count");
}
}
```
### 2.7 WordCountToRedisApp
```java
/**
* 进行词频统计 并将统计结果存储到Redis中
*/
public class WordCountToRedisApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String SPLIT_BOLT = "splitBolt";
private static final String COUNT_BOLT = "countBolt";
private static final String STORE_BOLT = "storeBolt";
//在实际开发中这些参数可以将通过外部传入 使得程序更加灵活
private static final String REDIS_HOST = "192.168.200.226";
private static final int REDIS_PORT = 6379;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
// split
builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
// count
builder.setBolt(COUNT_BOLT, new CountBolt()).shuffleGrouping(SPLIT_BOLT);
// save to redis
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
builder.setBolt(STORE_BOLT, storeBolt).shuffleGrouping(COUNT_BOLT);
// 如果外部传参cluster则代表线上环境启动否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterWordCountToRedisApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountToRedisApp",
new Config(), builder.createTopology());
}
}
}
```
### 2.8 启动测试
可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用`maven-shade-plugin`进行打包,打包命令如下:
```shell
# mvn clean package -D maven.test.skip=true
```
启动后查看Redis中的数据
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/store-redis-manager.png"/> </div>
## 三、storm-redis 实现原理
### 3.1 AbstractRedisBolt
`RedisLookupBolt``RedisStoreBolt``RedisFilterBolt `均继承自`AbstractRedisBolt`抽象类和我们自定义实现Bolt一样`AbstractRedisBolt`间接继承自`BaseRichBolt`
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-abstractRedisBolt.png"/> </div>
`AbstractRedisBolt`中比较重要的是prepare方法在该方法中通过外部传入的jedis连接池配置( jedisPoolConfig/jedisClusterConfig) 创建用于管理Jedis实例的容器`JedisCommandsInstanceContainer`
```java
public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt {
protected OutputCollector collector;
private transient JedisCommandsInstanceContainer container;
private JedisPoolConfig jedisPoolConfig;
private JedisClusterConfig jedisClusterConfig;
......
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
// FIXME: stores map (stormConf), topologyContext and expose these to derived classes
this.collector = collector;
if (jedisPoolConfig != null) {
this.container = JedisCommandsContainerBuilder.build(jedisPoolConfig);
} else if (jedisClusterConfig != null) {
this.container = JedisCommandsContainerBuilder.build(jedisClusterConfig);
} else {
throw new IllegalArgumentException("Jedis configuration not found");
}
}
.......
}
```
`JedisCommandsInstanceContainer``build()`方法如下实际上就是创建JedisPool或JedisCluster并传入容器中。
```java
public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
return new JedisContainer(jedisPool);
}
public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getTimeout(), config.getMaxRedirections(), config.getPassword(), DEFAULT_POOL_CONFIG);
return new JedisClusterContainer(jedisCluster);
}
```
### 3.2 RedisStoreBolt和RedisLookupBolt
`RedisStoreBolt`中比较重要的是process方法该方法主要从storeMapper中获取传入key/value的值并按照其存储类型`dataType`调用jedisCommand的对应方法进行存储。
RedisLookupBolt 的实现基本类似从lookupMapper中获取传入的key值并进行查询操作。
```java
public class RedisStoreBolt extends AbstractRedisBolt {
private final RedisStoreMapper storeMapper;
private final RedisDataTypeDescription.RedisDataType dataType;
private final String additionalKey;
public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
super(config);
this.storeMapper = storeMapper;
RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) {
super(config);
this.storeMapper = storeMapper;
RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
@Override
public void process(Tuple input) {
String key = storeMapper.getKeyFromTuple(input);
String value = storeMapper.getValueFromTuple(input);
JedisCommands jedisCommand = null;
try {
jedisCommand = getInstance();
switch (dataType) {
case STRING:
jedisCommand.set(key, value);
break;
case LIST:
jedisCommand.rpush(key, value);
break;
case HASH:
jedisCommand.hset(additionalKey, key, value);
break;
case SET:
jedisCommand.sadd(key, value);
break;
case SORTED_SET:
jedisCommand.zadd(additionalKey, Double.valueOf(value), key);
break;
case HYPER_LOG_LOG:
jedisCommand.pfadd(key, value);
break;
case GEO:
String[] array = value.split(":");
if (array.length != 2) {
throw new IllegalArgumentException("value structure should be longitude:latitude");
}
double longitude = Double.valueOf(array[0]);
double latitude = Double.valueOf(array[1]);
jedisCommand.geoadd(additionalKey, longitude, latitude, key);
break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + dataType);
}
collector.ack(input);
} catch (Exception e) {
this.collector.reportError(e);
this.collector.fail(input);
} finally {
returnInstance(jedisCommand);
}
}
.........
}
```
### 3.3 JedisCommands
JedisCommands接口中定义了所有的 Redis 客户端命令它有以下三个实现类分别是Jedis、JedisCluster、ShardedJedis。Strom中主要使用前两种实现类具体调用哪一个实现类来执行命令由传入的是jedisPoolConfig还是jedisClusterConfig来决定。
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-jedicCommands.png"/> </div>
### 3.4 RedisMapper 和 TupleMapper
RedisMapper 和 TupleMapper 定义了 tuple 和 Redis 中的数据如何进行映射转换。
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/storm-Redis-Mapper.png"/> </div>
#### 1. TupleMapper
TupleMapper 主要定义了两个方法:
+ getKeyFromTuple(ITuple tuple) 从tuple中获取那个字段作为Key
+ getValueFromTuple(ITuple tuple)从tuple中获取那个字段作为Value
#### 2. RedisMapper
定义了获取数据类型的方法`getDataTypeDescription()`,RedisDataTypeDescription中RedisDataType枚举类定义了所有可用的Redis数据类型
```java
public class RedisDataTypeDescription implements Serializable {
public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG, GEO }
......
}
```
#### 3. RedisStoreMapper
RedisStoreMapper继承TupleMapper和RedisMapper接口用于存储数据时没有定义额外方法。
#### 4. RedisLookupMapper
RedisLookupMapper继承TupleMapper和RedisMapper接口
+ 定义了declareOutputFields方法声明输出的字段。
+ 定义了toTuple方法将查询结果组装为Storm的Values的集合并用于发送。
下面的例子表示从输入`Tuple`的获取`word`字段作为key使用`RedisLookupBolt`进行查询后将key和查询结果value组装为values并发送到下一个处理单元。
```java
class WordCountRedisLookupMapper implements RedisLookupMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";
public WordCountRedisLookupMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public List<Values> toTuple(ITuple input, Object value) {
String member = getKeyFromTuple(input);
List<Values> values = Lists.newArrayList();
values.add(new Values(member, value));
return values;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordName", "count"));
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return null;
}
}
```
#### 5. RedisFilterMapper
RedisFilterMapper继承TupleMapper和RedisMapper接口用于查询数据时定义了declareOutputFields方法声明输出的字段。如下面的实现
```java
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordName", "count"));
}
```
## 四、自定义RedisBolt实现词频统计
### 4.1 实现原理
自定义RedisBolt主要利用Redis中哈希结构的hincrby key field命令进行词频统计。在Redis中`hincrby`的执行效果如下如果在执行时字段不存在则在执行操作之前将值设置为0。通过这个命令可以非常轻松的实现词频统计功能。
```shell
redis> HSET myhash field 5
(integer) 1
redis> HINCRBY myhash field 1
(integer) 6
redis> HINCRBY myhash field -1
(integer) 5
redis> HINCRBY myhash field -10
(integer) -5
redis>
```
### 4.2 项目结构
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/CustomRedisCountApp.png"/> </div>
### 4.3 自定义RedisBolt的代码实现
```java
/**
* 自定义RedisBolt 利用Redis的哈希数据结构的hincrby key field命令进行词频统计
*/
public class RedisCountStoreBolt extends AbstractRedisBolt {
private final RedisStoreMapper storeMapper;
private final RedisDataTypeDescription.RedisDataType dataType;
private final String additionalKey;
public RedisCountStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
super(config);
this.storeMapper = storeMapper;
RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
@Override
protected void process(Tuple tuple) {
String key = storeMapper.getKeyFromTuple(tuple);
String value = storeMapper.getValueFromTuple(tuple);
JedisCommands jedisCommand = null;
try {
jedisCommand = getInstance();
if (dataType == RedisDataTypeDescription.RedisDataType.HASH) {
jedisCommand.hincrBy(additionalKey, key, Long.valueOf(value));
} else {
throw new IllegalArgumentException("Cannot process such data type for Count: " + dataType);
}
collector.ack(tuple);
} catch (Exception e) {
this.collector.reportError(e);
this.collector.fail(tuple);
} finally {
returnInstance(jedisCommand);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
```
### 4.4 CustomRedisCountApp
```java
/**
* 利用自定义的RedisBolt实现词频统计
*/
public class CustomRedisCountApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String SPLIT_BOLT = "splitBolt";
private static final String STORE_BOLT = "storeBolt";
private static final String REDIS_HOST = "192.168.200.226";
private static final int REDIS_PORT = 6379;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
// split
builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
// save to redis and count
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisCountStoreBolt countStoreBolt = new RedisCountStoreBolt(poolConfig, storeMapper);
builder.setBolt(STORE_BOLT, countStoreBolt).shuffleGrouping(SPLIT_BOLT);
// 如果外部传参cluster则代表线上环境启动,否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterCustomRedisCountApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalCustomRedisCountApp",
new Config(), builder.createTopology());
}
}
}
```
## 参考资料
1. [Storm Redis Integration](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-redis.html)