diff --git a/code/Hbase/hbase-java-api-1.x/src/test/java/com/heibaiying/HbaseUtilsTest.java b/code/Hbase/hbase-java-api-1.x/src/test/java/com/heibaiying/HbaseUtilsTest.java
index 159d883..1505e47 100644
--- a/code/Hbase/hbase-java-api-1.x/src/test/java/com/heibaiying/HbaseUtilsTest.java
+++ b/code/Hbase/hbase-java-api-1.x/src/test/java/com/heibaiying/HbaseUtilsTest.java
@@ -14,7 +14,7 @@ import java.util.List;
public class HBaseUtilsTest {
- private static final String TABLE_NAME = "class";
+ private static final String TABLE_NAME = "WordCount";
private static final String TEACHER = "teacher";
private static final String STUDENT = "student";
@@ -67,7 +67,7 @@ public class HBaseUtilsTest {
ResultScanner scanner = HBaseUtils.getScanner(TABLE_NAME);
if (scanner != null) {
scanner.forEach(result -> System.out.println(Bytes.toString(result.getRow()) + "->" + Bytes
- .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name")))));
+ .toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("count")))));
scanner.close();
}
}
diff --git a/code/Storm/storm-hbase-integration/pom.xml b/code/Storm/storm-hbase-integration/pom.xml
new file mode 100644
index 0000000..c0446a9
--- /dev/null
+++ b/code/Storm/storm-hbase-integration/pom.xml
@@ -0,0 +1,57 @@
+
+
+ 4.0.0
+
+ com.heibaiying
+ storm-hbase-integration
+ 1.0
+
+
+ UTF-8
+ 1.2.2
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 8
+ 8
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/resources/assembly.xml
+
+
+
+ com.heibaiying.wordcount.ClusterWordCountApp
+
+
+
+
+
+
+
+
+
+
+ org.apache.storm
+ storm-core
+ ${storm.version}
+
+
+
+ org.apache.storm
+ storm-hbase
+ ${storm.version}
+
+
+
+
\ No newline at end of file
diff --git a/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/WordCountToHBaseApp.java b/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/WordCountToHBaseApp.java
new file mode 100644
index 0000000..3844b63
--- /dev/null
+++ b/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/WordCountToHBaseApp.java
@@ -0,0 +1,78 @@
+package com.heibaiying;
+
+import com.heibaiying.component.DataSourceSpout;
+import com.heibaiying.component.SplitBolt;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.hbase.bolt.HBaseBolt;
+import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 进行词频统计 并将统计结果存储到HBase中
+ *
+ * 编译打包: mvn clean assembly:assembly -Dmaven.test.skip=true
+ * hdfs://hadoop001:8020/hbase
+ */
+public class WordCountToHBaseApp {
+
+ private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
+ private static final String SPLIT_BOLT = "splitBolt";
+ private static final String COUNT_BOLT = "countBolt";
+ private static final String HBASE_BOLT = "hbaseBolt";
+
+ public static void main(String[] args) {
+
+ // storm的配置
+ Config config = new Config();
+
+ // HBase的配置
+ Map hbConf = new HashMap<>();
+ hbConf.put("hbase.rootdir", "hdfs://hadoop001:8020/hbase");
+ hbConf.put("hbase.zookeeper.quorum", "hadoop001:2181");
+
+ // 将HBase的配置传入Storm的配置中
+ config.put("hbase.conf", hbConf);
+
+ // 定义流数据与HBase中数据的映射
+ SimpleHBaseMapper mapper = new SimpleHBaseMapper()
+ .withRowKeyField("word")
+ .withColumnFields(new Fields("word"))
+ .withCounterFields(new Fields("count"))
+ .withColumnFamily("cf");
+
+ // 给HBaseBolt传入表名、数据映射关系、和HBase的配置信息
+ HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
+ .withConfigKey("hbase.conf");
+
+ // 构建Topology
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1);
+ // split
+ builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT);
+ // save to HBase
+ builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(SPLIT_BOLT, new Fields("word"));
+
+
+ // 如果外部传参cluster则代表线上环境启动,否则代表本地启动
+ if (args.length > 1 && args[1].equals("cluster")) {
+ try {
+ StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology());
+ } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
+ e.printStackTrace();
+ }
+ } else {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("LocalWordCountToRedisApp",
+ config, builder.createTopology());
+ }
+ }
+}
\ No newline at end of file
diff --git a/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/component/DataSourceSpout.java b/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/component/DataSourceSpout.java
new file mode 100644
index 0000000..541caba
--- /dev/null
+++ b/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/component/DataSourceSpout.java
@@ -0,0 +1,53 @@
+package com.heibaiying.component;
+
+import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.*;
+
+
+/**
+ * 产生词频样本的数据源
+ */
+public class DataSourceSpout extends BaseRichSpout {
+
+ private List list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
+
+ private SpoutOutputCollector spoutOutputCollector;
+
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ this.spoutOutputCollector = spoutOutputCollector;
+ }
+
+ @Override
+ public void nextTuple() {
+ // 模拟产生数据
+ String lineData = productData();
+ spoutOutputCollector.emit(new Values(lineData));
+ Utils.sleep(1000);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("line"));
+ }
+
+
+ /**
+ * 模拟数据
+ */
+ private String productData() {
+ Collections.shuffle(list);
+ Random random = new Random();
+ int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
+ return StringUtils.join(list.toArray(), "\t", 0, endIndex);
+ }
+
+}
diff --git a/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/component/SplitBolt.java b/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/component/SplitBolt.java
new file mode 100644
index 0000000..b315f11
--- /dev/null
+++ b/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/component/SplitBolt.java
@@ -0,0 +1,40 @@
+package com.heibaiying.component;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+import static org.apache.storm.utils.Utils.tuple;
+
+/**
+ * 将每行数据按照指定分隔符进行拆分
+ */
+public class SplitBolt extends BaseRichBolt {
+
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ String line = input.getStringByField("line");
+ String[] words = line.split("\t");
+ for (String word : words) {
+ collector.emit(tuple(word, 1));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+}
diff --git a/code/Storm/storm-hbase-integration/src/main/resources/assembly.xml b/code/Storm/storm-hbase-integration/src/main/resources/assembly.xml
new file mode 100644
index 0000000..dec0017
--- /dev/null
+++ b/code/Storm/storm-hbase-integration/src/main/resources/assembly.xml
@@ -0,0 +1,25 @@
+
+
+ with-dependencies
+
+
+
+ jar
+
+
+ false
+
+
+ /
+ true
+ true
+ runtime
+
+
+ org.apache.storm:storm-core
+
+
+
+
\ No newline at end of file
diff --git a/code/Storm/storm-hdfs-integration/pom.xml b/code/Storm/storm-hdfs-integration/pom.xml
new file mode 100644
index 0000000..5d1ad12
--- /dev/null
+++ b/code/Storm/storm-hdfs-integration/pom.xml
@@ -0,0 +1,110 @@
+
+
+ 4.0.0
+
+ com.heibaiying
+ storm-hdfs-integration
+ 1.0
+
+
+ UTF-8
+ 1.2.2
+
+
+
+
+ cloudera
+ https://repository.cloudera.com/artifactory/cloudera-repos/
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 8
+ 8
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+ true
+
+
+ *:*
+
+ org.apache.storm:storm-core
+
+
+
+
+
+
+ package
+
+ shade
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.storm
+ storm-core
+ ${storm.version}
+
+
+
+ org.apache.storm
+ storm-hdfs
+ ${storm.version}
+
+
+ org.apache.hadoop
+ hadoop-client
+ 2.6.0-cdh5.15.2
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ 2.6.0-cdh5.15.2
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/code/Storm/storm-hdfs-integration/src/main/java/com.heibaiying/WordCountToHdfsApp.java b/code/Storm/storm-hdfs-integration/src/main/java/com.heibaiying/WordCountToHdfsApp.java
new file mode 100644
index 0000000..ff9d139
--- /dev/null
+++ b/code/Storm/storm-hdfs-integration/src/main/java/com.heibaiying/WordCountToHdfsApp.java
@@ -0,0 +1,77 @@
+package com.heibaiying;
+
+import com.heibaiying.component.DataSourceSpout;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.hdfs.bolt.HdfsBolt;
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.format.RecordFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.topology.TopologyBuilder;
+
+/**
+ * 进行词频统计 并将统计结果存储到HDFS中
+ *
+ * hdfs://hadoopp001:8020 path
+ */
+public class WordCountToHdfsApp {
+
+ private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
+ private static final String HDFS_BOLT = "hdfsBolt";
+
+ public static void main(String[] args) {
+
+ // 定义存储文本的分隔符
+ RecordFormat format = new DelimitedRecordFormat()
+ .withFieldDelimiter("|");
+
+ // 同步策略: 每100个tuples之后就会把数据从缓存刷新到HDFS中
+ SyncPolicy syncPolicy = new CountSyncPolicy(100);
+
+ // 文件策略: 每个文件大小上限1M,超过限定时,创建新文件并继续写入
+ FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB);
+
+ // 定义完整路径
+ FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+ .withPath("/storm-hdfs/");
+
+ // 定义HdfsBolt
+ HdfsBolt hdfsBolt = new HdfsBolt()
+ .withFsUrl("hdfs://hadoop001:8020")
+ .withFileNameFormat(fileNameFormat)
+ .withRecordFormat(format)
+ .withRotationPolicy(rotationPolicy)
+ .withSyncPolicy(syncPolicy);
+
+
+ // 构建Topology
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
+ // save to HBase
+ builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT);
+
+
+ // 如果外部传参cluster则代表线上环境启动,否则代表本地启动
+ if (args.length > 0 && args[0].equals("cluster")) {
+ try {
+ StormSubmitter.submitTopology("ClusterWordCountToHdfsApp", new Config(), builder.createTopology());
+ } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
+ e.printStackTrace();
+ }
+ } else {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("LocalWordCountToHdfsApp",
+ new Config(), builder.createTopology());
+ }
+ }
+}
diff --git a/code/Storm/storm-hdfs-integration/src/main/java/com.heibaiying/component/DataSourceSpout.java b/code/Storm/storm-hdfs-integration/src/main/java/com.heibaiying/component/DataSourceSpout.java
new file mode 100644
index 0000000..541caba
--- /dev/null
+++ b/code/Storm/storm-hdfs-integration/src/main/java/com.heibaiying/component/DataSourceSpout.java
@@ -0,0 +1,53 @@
+package com.heibaiying.component;
+
+import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.*;
+
+
+/**
+ * 产生词频样本的数据源
+ */
+public class DataSourceSpout extends BaseRichSpout {
+
+ private List list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
+
+ private SpoutOutputCollector spoutOutputCollector;
+
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ this.spoutOutputCollector = spoutOutputCollector;
+ }
+
+ @Override
+ public void nextTuple() {
+ // 模拟产生数据
+ String lineData = productData();
+ spoutOutputCollector.emit(new Values(lineData));
+ Utils.sleep(1000);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("line"));
+ }
+
+
+ /**
+ * 模拟数据
+ */
+ private String productData() {
+ Collections.shuffle(list);
+ Random random = new Random();
+ int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
+ return StringUtils.join(list.toArray(), "\t", 0, endIndex);
+ }
+
+}
diff --git a/code/Storm/storm-redis-integration/pom.xml b/code/Storm/storm-redis-integration/pom.xml
index 0b641e6..04c43b4 100644
--- a/code/Storm/storm-redis-integration/pom.xml
+++ b/code/Storm/storm-redis-integration/pom.xml
@@ -10,7 +10,7 @@
UTF-8
- 1.2.0
+ 1.2.2
diff --git a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/CustomRedisCountApp.java b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/CustomRedisCountApp.java
index db0c9c6..9c69b48 100644
--- a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/CustomRedisCountApp.java
+++ b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/CustomRedisCountApp.java
@@ -18,22 +18,26 @@ import org.apache.storm.topology.TopologyBuilder;
*/
public class CustomRedisCountApp {
+ private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
+ private static final String SPLIT_BOLT = "splitBolt";
+ private static final String STORE_BOLT = "storeBolt";
+
private static final String REDIS_HOST = "192.168.200.226";
private static final int REDIS_PORT = 6379;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("dataSourceSpout", new DataSourceSpout());
+ builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
// split
- builder.setBolt("splitBolt", new SplitBolt()).shuffleGrouping("dataSourceSpout");
+ builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
// save to redis and count
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisCountStoreBolt countStoreBolt = new RedisCountStoreBolt(poolConfig, storeMapper);
- builder.setBolt("storeBolt", countStoreBolt).shuffleGrouping("splitBolt");
+ builder.setBolt(STORE_BOLT, countStoreBolt).shuffleGrouping(SPLIT_BOLT);
- // 如果外部传参cluster则代表线上环境启动否则代表本地启动
+ // 如果外部传参cluster则代表线上环境启动,否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterCustomRedisCountApp", new Config(), builder.createTopology());
diff --git a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/WordCountToRedisApp.java b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/WordCountToRedisApp.java
index 170ca40..c1591d5 100644
--- a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/WordCountToRedisApp.java
+++ b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/WordCountToRedisApp.java
@@ -17,29 +17,35 @@ import org.apache.storm.topology.TopologyBuilder;
/**
* 进行词频统计 并将统计结果存储到Redis中
- *
+ *
* 编译打包: mvn clean assembly:assembly -Dmaven.test.skip=true
- * 提交Topology到集群: storm jar /usr/appjar/storm-redis-integration-1.0-jar-with-dependencies.jar com.heibaiying.WordCountToRedisApp cluster
+ * 提交Topology到集群: storm jar /usr/appjar/storm-redis-integration-1.0-with-dependencies.jar com.heibaiying.WordCountToRedisApp cluster
* 停止Topology: storm kill ClusterWordCountApp -w 3
*/
public class WordCountToRedisApp {
+ private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
+ private static final String SPLIT_BOLT = "splitBolt";
+ private static final String COUNT_BOLT = "countBolt";
+ private static final String STORE_BOLT = "storeBolt";
+
+ //在实际开发中这些参数可以将通过外部传入 使得程序更加灵活
private static final String REDIS_HOST = "192.168.200.226";
private static final int REDIS_PORT = 6379;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("dataSourceSpout", new DataSourceSpout());
+ builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
// split
- builder.setBolt("splitBolt", new SplitBolt()).shuffleGrouping("dataSourceSpout");
+ builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
// count
- builder.setBolt("countBolt", new CountBolt()).shuffleGrouping("splitBolt");
+ builder.setBolt(COUNT_BOLT, new CountBolt()).shuffleGrouping(SPLIT_BOLT);
// save to redis
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
- builder.setBolt("storeBolt", storeBolt).shuffleGrouping("countBolt");
+ builder.setBolt(STORE_BOLT, storeBolt).shuffleGrouping(COUNT_BOLT);
// 如果外部传参cluster则代表线上环境启动否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
diff --git a/code/Storm/storm-redis-integration/src/main/resources/assembly.xml b/code/Storm/storm-redis-integration/src/main/resources/assembly.xml
index 3c38fe4..dec0017 100644
--- a/code/Storm/storm-redis-integration/src/main/resources/assembly.xml
+++ b/code/Storm/storm-redis-integration/src/main/resources/assembly.xml
@@ -2,7 +2,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
- jar-with-dependencies
+ with-dependencies