storm整合redis代码实现
This commit is contained in:
parent
8a35586f38
commit
959dd95995
57
code/Storm/storm-redis-integration/pom.xml
Normal file
57
code/Storm/storm-redis-integration/pom.xml
Normal file
@ -0,0 +1,57 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.heibaiying</groupId>
|
||||
<artifactId>storm-redis-integration</artifactId>
|
||||
<version>1.0</version>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<storm.version>1.2.0</storm.version>
|
||||
</properties>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>8</source>
|
||||
<target>8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<configuration>
|
||||
<descriptors>
|
||||
<descriptor>src/main/resources/assembly.xml</descriptor>
|
||||
</descriptors>
|
||||
<archive>
|
||||
<manifest>
|
||||
<mainClass>com.heibaiying.wordcount.ClusterWordCountApp</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.storm</groupId>
|
||||
<artifactId>storm-core</artifactId>
|
||||
<version>${storm.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.storm</groupId>
|
||||
<artifactId>storm-redis</artifactId>
|
||||
<version>${storm.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
||||
</project>
|
@ -0,0 +1,45 @@
|
||||
package com.heibaiying;
|
||||
|
||||
import com.heibaiying.component.*;
|
||||
import org.apache.storm.Config;
|
||||
import org.apache.storm.LocalCluster;
|
||||
import org.apache.storm.StormSubmitter;
|
||||
import org.apache.storm.generated.AlreadyAliveException;
|
||||
import org.apache.storm.generated.AuthorizationException;
|
||||
import org.apache.storm.generated.InvalidTopologyException;
|
||||
import org.apache.storm.redis.bolt.RedisStoreBolt;
|
||||
import org.apache.storm.redis.common.config.JedisPoolConfig;
|
||||
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
|
||||
import org.apache.storm.topology.TopologyBuilder;
|
||||
|
||||
public class CustomRedisCountApp {
|
||||
|
||||
private static final String REDIS_HOST = "192.168.200.226";
|
||||
private static final int REDIS_PORT = 6379;
|
||||
|
||||
public static void main(String[] args) {
|
||||
TopologyBuilder builder = new TopologyBuilder();
|
||||
builder.setSpout("dataSourceSpout", new DataSourceSpout());
|
||||
// split
|
||||
builder.setBolt("splitBolt", new SplitBolt()).shuffleGrouping("dataSourceSpout");
|
||||
// save to redis and count
|
||||
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
|
||||
.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
|
||||
RedisStoreMapper storeMapper = new WordCountStoreMapper();
|
||||
RedisCountStoreBolt countStoreBolt = new RedisCountStoreBolt(poolConfig, storeMapper);
|
||||
builder.setBolt("storeBolt", countStoreBolt).shuffleGrouping("splitBolt");
|
||||
|
||||
// 如果外部传参cluster则代表线上环境启动否则代表本地启动
|
||||
if (args.length > 0 && args[0].equals("cluster")) {
|
||||
try {
|
||||
StormSubmitter.submitTopology("ClusterCustomRedisCountApp", new Config(), builder.createTopology());
|
||||
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
} else {
|
||||
LocalCluster cluster = new LocalCluster();
|
||||
cluster.submitTopology("LocalCustomRedisCountApp",
|
||||
new Config(), builder.createTopology());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
package com.heibaiying;
|
||||
|
||||
import com.heibaiying.component.CountBolt;
|
||||
import com.heibaiying.component.DataSourceSpout;
|
||||
import com.heibaiying.component.SplitBolt;
|
||||
import com.heibaiying.component.WordCountStoreMapper;
|
||||
import org.apache.storm.Config;
|
||||
import org.apache.storm.LocalCluster;
|
||||
import org.apache.storm.StormSubmitter;
|
||||
import org.apache.storm.generated.AlreadyAliveException;
|
||||
import org.apache.storm.generated.AuthorizationException;
|
||||
import org.apache.storm.generated.InvalidTopologyException;
|
||||
import org.apache.storm.redis.bolt.RedisStoreBolt;
|
||||
import org.apache.storm.redis.common.config.JedisPoolConfig;
|
||||
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
|
||||
import org.apache.storm.topology.TopologyBuilder;
|
||||
|
||||
/**
|
||||
* 进行词频统计 并将统计结果存储到Redis中
|
||||
*
|
||||
* 编译打包: mvn clean assembly:assembly -Dmaven.test.skip=true
|
||||
* 提交Topology到集群: storm jar /usr/appjar/storm-redis-integration-1.0-jar-with-dependencies.jar com.heibaiying.WordCountToRedisApp cluster
|
||||
* 停止Topology: storm kill ClusterWordCountApp -w 3
|
||||
*/
|
||||
public class WordCountToRedisApp {
|
||||
|
||||
private static final String REDIS_HOST = "192.168.200.226";
|
||||
private static final int REDIS_PORT = 6379;
|
||||
|
||||
public static void main(String[] args) {
|
||||
TopologyBuilder builder = new TopologyBuilder();
|
||||
builder.setSpout("dataSourceSpout", new DataSourceSpout());
|
||||
// split
|
||||
builder.setBolt("splitBolt", new SplitBolt()).shuffleGrouping("dataSourceSpout");
|
||||
// count
|
||||
builder.setBolt("countBolt", new CountBolt()).shuffleGrouping("splitBolt");
|
||||
// save to redis
|
||||
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
|
||||
.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
|
||||
RedisStoreMapper storeMapper = new WordCountStoreMapper();
|
||||
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
|
||||
builder.setBolt("storeBolt", storeBolt).shuffleGrouping("countBolt");
|
||||
|
||||
// 如果外部传参cluster则代表线上环境启动否则代表本地启动
|
||||
if (args.length > 0 && args[0].equals("cluster")) {
|
||||
try {
|
||||
StormSubmitter.submitTopology("ClusterWordCountToRedisApp", new Config(), builder.createTopology());
|
||||
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
} else {
|
||||
LocalCluster cluster = new LocalCluster();
|
||||
cluster.submitTopology("LocalWordCountToRedisApp",
|
||||
new Config(), builder.createTopology());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
package com.heibaiying.component;
|
||||
|
||||
import org.apache.storm.task.OutputCollector;
|
||||
import org.apache.storm.task.TopologyContext;
|
||||
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||
import org.apache.storm.topology.base.BaseRichBolt;
|
||||
import org.apache.storm.tuple.Fields;
|
||||
import org.apache.storm.tuple.Tuple;
|
||||
import org.apache.storm.tuple.Values;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class CountBolt extends BaseRichBolt {
|
||||
|
||||
private Map<String, Integer> counts = new HashMap<>();
|
||||
|
||||
private OutputCollector collector;
|
||||
|
||||
|
||||
@Override
|
||||
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
|
||||
this.collector=collector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Tuple input) {
|
||||
String word = input.getStringByField("word");
|
||||
Integer count = counts.get(word);
|
||||
if (count == null) {
|
||||
count = 0;
|
||||
}
|
||||
count++;
|
||||
counts.put(word, count);
|
||||
// 输出
|
||||
collector.emit(new Values(word, String.valueOf(count)));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void declareOutputFields(OutputFieldsDeclarer declarer) {
|
||||
declarer.declare(new Fields("word", "count"));
|
||||
}
|
||||
}
|
@ -0,0 +1,49 @@
|
||||
package com.heibaiying.component;
|
||||
|
||||
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
|
||||
import org.apache.storm.spout.SpoutOutputCollector;
|
||||
import org.apache.storm.task.TopologyContext;
|
||||
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||
import org.apache.storm.topology.base.BaseRichSpout;
|
||||
import org.apache.storm.tuple.Fields;
|
||||
import org.apache.storm.tuple.Values;
|
||||
import org.apache.storm.utils.Utils;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
public class DataSourceSpout extends BaseRichSpout {
|
||||
|
||||
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
|
||||
|
||||
private SpoutOutputCollector spoutOutputCollector;
|
||||
|
||||
@Override
|
||||
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
|
||||
this.spoutOutputCollector = spoutOutputCollector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nextTuple() {
|
||||
// 模拟产生数据
|
||||
String lineData = productData();
|
||||
spoutOutputCollector.emit(new Values(lineData));
|
||||
Utils.sleep(1000);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
|
||||
outputFieldsDeclarer.declare(new Fields("line"));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 模拟数据
|
||||
*/
|
||||
private String productData() {
|
||||
Collections.shuffle(list);
|
||||
Random random = new Random();
|
||||
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
|
||||
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,52 @@
|
||||
package com.heibaiying.component;
|
||||
|
||||
import org.apache.storm.redis.bolt.AbstractRedisBolt;
|
||||
import org.apache.storm.redis.common.config.JedisPoolConfig;
|
||||
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
|
||||
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
|
||||
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||
import org.apache.storm.tuple.Tuple;
|
||||
import redis.clients.jedis.JedisCommands;
|
||||
|
||||
public class RedisCountStoreBolt extends AbstractRedisBolt {
|
||||
|
||||
private final RedisStoreMapper storeMapper;
|
||||
private final RedisDataTypeDescription.RedisDataType dataType;
|
||||
private final String additionalKey;
|
||||
|
||||
public RedisCountStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
|
||||
super(config);
|
||||
this.storeMapper = storeMapper;
|
||||
RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
|
||||
this.dataType = dataTypeDescription.getDataType();
|
||||
this.additionalKey = dataTypeDescription.getAdditionalKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void process(Tuple tuple) {
|
||||
String key = storeMapper.getKeyFromTuple(tuple);
|
||||
String value = storeMapper.getValueFromTuple(tuple);
|
||||
|
||||
JedisCommands jedisCommand = null;
|
||||
try {
|
||||
jedisCommand = getInstance();
|
||||
if (dataType == RedisDataTypeDescription.RedisDataType.HASH) {
|
||||
jedisCommand.hincrBy(additionalKey, key, Long.valueOf(value));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot process such data type for Count: " + dataType);
|
||||
}
|
||||
|
||||
collector.ack(tuple);
|
||||
} catch (Exception e) {
|
||||
this.collector.reportError(e);
|
||||
this.collector.fail(tuple);
|
||||
} finally {
|
||||
returnInstance(jedisCommand);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void declareOutputFields(OutputFieldsDeclarer declarer) {
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
package com.heibaiying.component;
|
||||
|
||||
import org.apache.storm.task.OutputCollector;
|
||||
import org.apache.storm.task.TopologyContext;
|
||||
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||
import org.apache.storm.topology.base.BaseRichBolt;
|
||||
import org.apache.storm.tuple.Fields;
|
||||
import org.apache.storm.tuple.Tuple;
|
||||
import org.apache.storm.tuple.Values;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class SplitBolt extends BaseRichBolt {
|
||||
|
||||
private OutputCollector collector;
|
||||
|
||||
@Override
|
||||
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
|
||||
this.collector = collector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Tuple input) {
|
||||
String line = input.getStringByField("line");
|
||||
String[] words = line.split("\t");
|
||||
for (String word : words) {
|
||||
collector.emit(new Values(word, String.valueOf(1)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void declareOutputFields(OutputFieldsDeclarer declarer) {
|
||||
declarer.declare(new Fields("word", "count"));
|
||||
}
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package com.heibaiying.component;
|
||||
|
||||
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
|
||||
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
|
||||
import org.apache.storm.tuple.ITuple;
|
||||
|
||||
public class WordCountStoreMapper implements RedisStoreMapper {
|
||||
private RedisDataTypeDescription description;
|
||||
private final String hashKey = "wordCount";
|
||||
|
||||
public WordCountStoreMapper() {
|
||||
description = new RedisDataTypeDescription(
|
||||
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisDataTypeDescription getDataTypeDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKeyFromTuple(ITuple tuple) {
|
||||
return tuple.getStringByField("word");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValueFromTuple(ITuple tuple) {
|
||||
return tuple.getStringByField("count");
|
||||
}
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
|
||||
|
||||
<id>jar-with-dependencies</id>
|
||||
|
||||
<!--指明打包方式-->
|
||||
<formats>
|
||||
<format>jar</format>
|
||||
</formats>
|
||||
|
||||
<includeBaseDirectory>false</includeBaseDirectory>
|
||||
<dependencySets>
|
||||
<dependencySet>
|
||||
<outputDirectory>/</outputDirectory>
|
||||
<useProjectArtifact>true</useProjectArtifact>
|
||||
<unpack>true</unpack>
|
||||
<scope>runtime</scope>
|
||||
<!--排除storm环境中已经提供的storm-core-->
|
||||
<excludes>
|
||||
<exclude>org.apache.storm:storm-core</exclude>
|
||||
</excludes>
|
||||
</dependencySet>
|
||||
</dependencySets>
|
||||
</assembly>
|
@ -58,7 +58,7 @@ export PATH=$STORM_HOME/bin:$PATH
|
||||
|
||||
### 4.启动相关进程
|
||||
|
||||
因为要启动多个进程,所以统一采用后台进程的方式启动,采用后台启动时需要进入到`$STORM_HOME/bin`目录下,命令如下:
|
||||
因为要启动多个进程,所以统一采用后台进程的方式启动,采用后台启动时需要进入到`${STORM_HOME}/bin`目录下,命令如下:
|
||||
|
||||
```shell
|
||||
# 启动zookeeper
|
||||
|
BIN
pictures/store-redis-manager.png
Normal file
BIN
pictures/store-redis-manager.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 19 KiB |
Loading…
x
Reference in New Issue
Block a user