diff --git a/code/Storm/storm-redis-integration/pom.xml b/code/Storm/storm-redis-integration/pom.xml new file mode 100644 index 0000000..0b641e6 --- /dev/null +++ b/code/Storm/storm-redis-integration/pom.xml @@ -0,0 +1,57 @@ + + + 4.0.0 + + com.heibaiying + storm-redis-integration + 1.0 + + + UTF-8 + 1.2.0 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + maven-assembly-plugin + + + src/main/resources/assembly.xml + + + + com.heibaiying.wordcount.ClusterWordCountApp + + + + + + + + + + + org.apache.storm + storm-core + ${storm.version} + + + org.apache.storm + storm-redis + ${storm.version} + + + + + \ No newline at end of file diff --git a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/CustomRedisCountApp.java b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/CustomRedisCountApp.java new file mode 100644 index 0000000..c3986b5 --- /dev/null +++ b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/CustomRedisCountApp.java @@ -0,0 +1,45 @@ +package com.heibaiying; + +import com.heibaiying.component.*; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.redis.bolt.RedisStoreBolt; +import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.mapper.RedisStoreMapper; +import org.apache.storm.topology.TopologyBuilder; + +public class CustomRedisCountApp { + + private static final String REDIS_HOST = "192.168.200.226"; + private static final int REDIS_PORT = 6379; + + public static void main(String[] args) { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("dataSourceSpout", new DataSourceSpout()); + // split + builder.setBolt("splitBolt", new SplitBolt()).shuffleGrouping("dataSourceSpout"); + // save to redis and count + JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() + .setHost(REDIS_HOST).setPort(REDIS_PORT).build(); + RedisStoreMapper storeMapper = new WordCountStoreMapper(); + RedisCountStoreBolt countStoreBolt = new RedisCountStoreBolt(poolConfig, storeMapper); + builder.setBolt("storeBolt", countStoreBolt).shuffleGrouping("splitBolt"); + + // 如果外部传参cluster则代表线上环境启动否则代表本地启动 + if (args.length > 0 && args[0].equals("cluster")) { + try { + StormSubmitter.submitTopology("ClusterCustomRedisCountApp", new Config(), builder.createTopology()); + } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { + e.printStackTrace(); + } + } else { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("LocalCustomRedisCountApp", + new Config(), builder.createTopology()); + } + } +} diff --git a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/WordCountToRedisApp.java b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/WordCountToRedisApp.java new file mode 100644 index 0000000..170ca40 --- /dev/null +++ b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/WordCountToRedisApp.java @@ -0,0 +1,57 @@ +package com.heibaiying; + +import com.heibaiying.component.CountBolt; +import com.heibaiying.component.DataSourceSpout; +import com.heibaiying.component.SplitBolt; +import com.heibaiying.component.WordCountStoreMapper; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.redis.bolt.RedisStoreBolt; +import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.mapper.RedisStoreMapper; +import org.apache.storm.topology.TopologyBuilder; + +/** + * 进行词频统计 并将统计结果存储到Redis中 + * + * 编译打包: mvn clean assembly:assembly -Dmaven.test.skip=true + * 提交Topology到集群: storm jar /usr/appjar/storm-redis-integration-1.0-jar-with-dependencies.jar com.heibaiying.WordCountToRedisApp cluster + * 停止Topology: storm kill ClusterWordCountApp -w 3 + */ +public class WordCountToRedisApp { + + private static final String REDIS_HOST = "192.168.200.226"; + private static final int REDIS_PORT = 6379; + + public static void main(String[] args) { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("dataSourceSpout", new DataSourceSpout()); + // split + builder.setBolt("splitBolt", new SplitBolt()).shuffleGrouping("dataSourceSpout"); + // count + builder.setBolt("countBolt", new CountBolt()).shuffleGrouping("splitBolt"); + // save to redis + JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() + .setHost(REDIS_HOST).setPort(REDIS_PORT).build(); + RedisStoreMapper storeMapper = new WordCountStoreMapper(); + RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper); + builder.setBolt("storeBolt", storeBolt).shuffleGrouping("countBolt"); + + // 如果外部传参cluster则代表线上环境启动否则代表本地启动 + if (args.length > 0 && args[0].equals("cluster")) { + try { + StormSubmitter.submitTopology("ClusterWordCountToRedisApp", new Config(), builder.createTopology()); + } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { + e.printStackTrace(); + } + } else { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("LocalWordCountToRedisApp", + new Config(), builder.createTopology()); + } + } +} diff --git a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/CountBolt.java b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/CountBolt.java new file mode 100644 index 0000000..2884315 --- /dev/null +++ b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/CountBolt.java @@ -0,0 +1,44 @@ +package com.heibaiying.component; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +public class CountBolt extends BaseRichBolt { + + private Map counts = new HashMap<>(); + + private OutputCollector collector; + + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector=collector; + } + + @Override + public void execute(Tuple input) { + String word = input.getStringByField("word"); + Integer count = counts.get(word); + if (count == null) { + count = 0; + } + count++; + counts.put(word, count); + // 输出 + collector.emit(new Values(word, String.valueOf(count))); + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } +} diff --git a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/DataSourceSpout.java b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/DataSourceSpout.java new file mode 100644 index 0000000..c4a351f --- /dev/null +++ b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/DataSourceSpout.java @@ -0,0 +1,49 @@ +package com.heibaiying.component; + +import org.apache.storm.shade.org.apache.commons.lang.StringUtils; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +import java.util.*; + +public class DataSourceSpout extends BaseRichSpout { + + private List list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); + + private SpoutOutputCollector spoutOutputCollector; + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + this.spoutOutputCollector = spoutOutputCollector; + } + + @Override + public void nextTuple() { + // 模拟产生数据 + String lineData = productData(); + spoutOutputCollector.emit(new Values(lineData)); + Utils.sleep(1000); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("line")); + } + + + /** + * 模拟数据 + */ + private String productData() { + Collections.shuffle(list); + Random random = new Random(); + int endIndex = random.nextInt(list.size()) % (list.size()) + 1; + return StringUtils.join(list.toArray(), "\t", 0, endIndex); + } + +} diff --git a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/RedisCountStoreBolt.java b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/RedisCountStoreBolt.java new file mode 100644 index 0000000..9a5cfd1 --- /dev/null +++ b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/RedisCountStoreBolt.java @@ -0,0 +1,52 @@ +package com.heibaiying.component; + +import org.apache.storm.redis.bolt.AbstractRedisBolt; +import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisStoreMapper; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import redis.clients.jedis.JedisCommands; + +public class RedisCountStoreBolt extends AbstractRedisBolt { + + private final RedisStoreMapper storeMapper; + private final RedisDataTypeDescription.RedisDataType dataType; + private final String additionalKey; + + public RedisCountStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) { + super(config); + this.storeMapper = storeMapper; + RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription(); + this.dataType = dataTypeDescription.getDataType(); + this.additionalKey = dataTypeDescription.getAdditionalKey(); + } + + @Override + protected void process(Tuple tuple) { + String key = storeMapper.getKeyFromTuple(tuple); + String value = storeMapper.getValueFromTuple(tuple); + + JedisCommands jedisCommand = null; + try { + jedisCommand = getInstance(); + if (dataType == RedisDataTypeDescription.RedisDataType.HASH) { + jedisCommand.hincrBy(additionalKey, key, Long.valueOf(value)); + } else { + throw new IllegalArgumentException("Cannot process such data type for Count: " + dataType); + } + + collector.ack(tuple); + } catch (Exception e) { + this.collector.reportError(e); + this.collector.fail(tuple); + } finally { + returnInstance(jedisCommand); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} \ No newline at end of file diff --git a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/SplitBolt.java b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/SplitBolt.java new file mode 100644 index 0000000..d92ca07 --- /dev/null +++ b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/SplitBolt.java @@ -0,0 +1,35 @@ +package com.heibaiying.component; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.Map; + +public class SplitBolt extends BaseRichBolt { + + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + String line = input.getStringByField("line"); + String[] words = line.split("\t"); + for (String word : words) { + collector.emit(new Values(word, String.valueOf(1))); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } +} diff --git a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/WordCountStoreMapper.java b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/WordCountStoreMapper.java new file mode 100644 index 0000000..21e4ab3 --- /dev/null +++ b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/WordCountStoreMapper.java @@ -0,0 +1,30 @@ +package com.heibaiying.component; + +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisStoreMapper; +import org.apache.storm.tuple.ITuple; + +public class WordCountStoreMapper implements RedisStoreMapper { + private RedisDataTypeDescription description; + private final String hashKey = "wordCount"; + + public WordCountStoreMapper() { + description = new RedisDataTypeDescription( + RedisDataTypeDescription.RedisDataType.HASH, hashKey); + } + + @Override + public RedisDataTypeDescription getDataTypeDescription() { + return description; + } + + @Override + public String getKeyFromTuple(ITuple tuple) { + return tuple.getStringByField("word"); + } + + @Override + public String getValueFromTuple(ITuple tuple) { + return tuple.getStringByField("count"); + } +} diff --git a/code/Storm/storm-redis-integration/src/main/resources/assembly.xml b/code/Storm/storm-redis-integration/src/main/resources/assembly.xml new file mode 100644 index 0000000..3c38fe4 --- /dev/null +++ b/code/Storm/storm-redis-integration/src/main/resources/assembly.xml @@ -0,0 +1,25 @@ + + + jar-with-dependencies + + + + jar + + + false + + + / + true + true + runtime + + + org.apache.storm:storm-core + + + + \ No newline at end of file diff --git a/notes/installation/Storm单机版本环境搭建.md b/notes/installation/Storm单机版本环境搭建.md index 33fd993..579b3e2 100644 --- a/notes/installation/Storm单机版本环境搭建.md +++ b/notes/installation/Storm单机版本环境搭建.md @@ -58,7 +58,7 @@ export PATH=$STORM_HOME/bin:$PATH ### 4.启动相关进程 -因为要启动多个进程,所以统一采用后台进程的方式启动,采用后台启动时需要进入到`$STORM_HOME/bin`目录下,命令如下: +因为要启动多个进程,所以统一采用后台进程的方式启动,采用后台启动时需要进入到`${STORM_HOME}/bin`目录下,命令如下: ```shell # 启动zookeeper diff --git a/pictures/store-redis-manager.png b/pictures/store-redis-manager.png new file mode 100644 index 0000000..66f8a2e Binary files /dev/null and b/pictures/store-redis-manager.png differ