diff --git a/code/Storm/storm-hdfs-integration/pom.xml b/code/Storm/storm-hdfs-integration/pom.xml index f65e03c..643b3de 100644 --- a/code/Storm/storm-hdfs-integration/pom.xml +++ b/code/Storm/storm-hdfs-integration/pom.xml @@ -9,7 +9,6 @@ 1.0 - UTF-8 1.2.2 @@ -20,6 +19,53 @@ + + + org.apache.storm + storm-core + ${storm.version} + + + + org.apache.storm + storm-hdfs + ${storm.version} + + + org.apache.hadoop + hadoop-common + 2.6.0-cdh5.15.2 + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.hadoop + hadoop-client + 2.6.0-cdh5.15.2 + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.hadoop + hadoop-hdfs + 2.6.0-cdh5.15.2 + + + org.slf4j + slf4j-log4j12 + + + + + @@ -83,53 +129,4 @@ - - - - org.apache.storm - storm-core - ${storm.version} - - - - org.apache.storm - storm-hdfs - ${storm.version} - - - org.apache.hadoop - hadoop-common - 2.6.0-cdh5.15.2 - - - org.slf4j - slf4j-log4j12 - - - - - org.apache.hadoop - hadoop-client - 2.6.0-cdh5.15.2 - - - org.slf4j - slf4j-log4j12 - - - - - org.apache.hadoop - hadoop-hdfs - 2.6.0-cdh5.15.2 - - - org.slf4j - slf4j-log4j12 - - - - - - \ No newline at end of file diff --git a/code/Storm/storm-kafka-integration/pom.xml b/code/Storm/storm-kafka-integration/pom.xml index f1ffe16..48afea3 100644 --- a/code/Storm/storm-kafka-integration/pom.xml +++ b/code/Storm/storm-kafka-integration/pom.xml @@ -8,6 +8,29 @@ storm-kafka-integration 1.0 + + 1.2.2 + 2.2.0 + + + + + org.apache.storm + storm-core + ${storm.version} + + + org.apache.storm + storm-kafka-client + ${storm.version} + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + @@ -68,27 +91,4 @@ - - 1.2.2 - 2.2.0 - - - - - org.apache.storm - storm-core - ${storm.version} - - - org.apache.storm - storm-kafka-client - ${storm.version} - - - org.apache.kafka - kafka-clients - ${kafka.version} - - - \ No newline at end of file diff --git a/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/write/WritingToKafkaApp.java b/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/write/WritingToKafkaApp.java index 281824b..56814e3 100644 --- a/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/write/WritingToKafkaApp.java +++ b/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/write/WritingToKafkaApp.java @@ -14,7 +14,7 @@ import org.apache.storm.topology.TopologyBuilder; import java.util.Properties; /** - * 写入数据到Kafka的特定主题中 + * 写入数据到Kafka中 */ public class WritingToKafkaApp { diff --git a/code/Storm/storm-redis-integration/pom.xml b/code/Storm/storm-redis-integration/pom.xml index 52ee5b8..c3c91dd 100644 --- a/code/Storm/storm-redis-integration/pom.xml +++ b/code/Storm/storm-redis-integration/pom.xml @@ -9,10 +9,23 @@ 1.0 - UTF-8 1.2.2 + + + org.apache.storm + storm-core + ${storm.version} + + + org.apache.storm + storm-redis + ${storm.version} + + + + @@ -73,19 +86,4 @@ - - - - org.apache.storm - storm-core - ${storm.version} - - - org.apache.storm - storm-redis - ${storm.version} - - - - \ No newline at end of file diff --git a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/WordCountStoreMapper.java b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/WordCountStoreMapper.java index 6343e9f..d75e568 100644 --- a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/WordCountStoreMapper.java +++ b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/component/WordCountStoreMapper.java @@ -5,7 +5,7 @@ import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.tuple.ITuple; /** - * 定义流数据与Redis中数据的映射关系 + * 定义tuple与Redis中数据的映射关系 */ public class WordCountStoreMapper implements RedisStoreMapper { private RedisDataTypeDescription description; diff --git a/notes/Storm多种打包方式对比分析.md b/notes/Storm多种打包方式对比分析.md new file mode 100644 index 0000000..c74e8ed --- /dev/null +++ b/notes/Storm多种打包方式对比分析.md @@ -0,0 +1,309 @@ +# Storm多种打包方式对比分析 + +## 一、简介 + +在将Storm Topology提交到服务器集群进行运行时,需要先将项目进行打包,本文主要对比分析各种打包方式,并将打包过程中需要注意的事项进行说明。主要打包方式有以下三种: + ++ 第一种:不加任何插件,直接使用mvn package打包; ++ 第二种:使用maven-assembly-plugin插件进行打包; ++ 第三种:使用maven-shade-plugin进行打包。 + +以下分别进行详细的说明。 + + + +## 二、mvn package + +### 2.1 mvn package的局限 + +不在POM中配置任何插件,直接使用`mvn package`进行项目打包,这对于没有使用外部依赖包的项目是可行的。 + +但如果项目中使用了第三方JAR包,就会出现问题,因为`mvn package`打包后的JAR中是不含有依赖包的,如果此时你提交到服务器上运行,就会出现找不到第三方依赖的异常。 + +如果你想采用这种方式进行打包,但是又使用了第三方JAR包,有没有解决办法?答案是有的,这一点在官方文档中[Command Line Client](http://storm.apache.org/releases/2.0.0-SNAPSHOT/Command-line-client.html)这部分有所讲解,主要解决办法如下。 + +### 2.2 解决办法 + +在使用`storm jar topology-jar-path class ...`提交Topology的时候,需要指定第三方依赖时候,可以进行如下操作: + ++ 如果第三方JAR包在本地,可以使用`--jars`指定; ++ 如果第三方JAR包在远程中央仓库,可以使用`--artifacts` 指定,此时如果想要排除某些依赖,可以使用 `^` 符号; ++ 如果第三方JAR包在其他仓库,还需要使用 `--artifactRepositories`指明仓库地址,库名和地址使用 `^` 符号分隔。 + +以下是包含上面三种情况的一个例子: + +```shell +./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka/storm-kafka-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/" +``` + + + +## 三、maven-assembly-plugin插件 + +### 3.1 官方文档说明 + +maven-assembly-plugin是官方文档中介绍的打包方法,以下表述来源于官方文档:[Running Topologies on a Production Cluster](http://storm.apache.org/releases/2.0.0-SNAPSHOT/Running-topologies-on-a-production-cluster.html) + +> If you're using Maven, the [Maven Assembly Plugin](http://maven.apache.org/plugins/maven-assembly-plugin/) can do the packaging for you. Just add this to your pom.xml: +> +> ```xml +> +> maven-assembly-plugin +> +> +> jar-with-dependencies +> +> +> +> com.path.to.main.Class +> +> +> +> +> ``` +> +> Then run mvn assembly:assembly to get an appropriately packaged jar. Make sure you [exclude](http://maven.apache.org/plugins/maven-assembly-plugin/examples/single/including-and-excluding-artifacts.html) the Storm jars since the cluster already has Storm on the classpath. + +主要是两点: + +- 使用maven-assembly-plugin进行打包,因为maven-assembly-plugin会把所有的依赖一并打包到最后的JAR中; +- 排除掉Storm集群环境中已经提供的Storm jars。 + +maven-assembly-plugin的使用非常简单,只需要在POM.xml中引入即可,并且在\标签指定打包格式为`jar-with-dependencies`。那么就剩下一个问题:如何排除Storm jars ? + +### 3.2 排除Storm jars + +这里说明一下,`jar-with-dependencies`是Maven官方内置的一种打包格式,Maven官方文档[Pre-defined Descriptor Files](http://maven.apache.org/plugins/maven-assembly-plugin/descriptor-refs.html)中有所说明: + +![jar-with-dependencies](D:\BigData-Notes\pictures\jar-with-dependencies.png) + +如果你想排除某个依赖,这里以排除`storm-core`为例,你可以在`jar-with-dependencies`的XML上进行修改。 + +```xml + + + jar-with-dependencies + + + + jar + + + false + + + / + true + true + runtime + + + org.apache.storm:storm-core + + + + +``` + +### 3.3 最终配置 + +采用maven-assembly-plugin进行打包时候,最终的配置应该如下: + +#### 1.引入插件 + +在POM.xml中引入插件,并指定打包格式的配置文件`assembly.xml`(名称可自定义): + +```xml + + + + maven-assembly-plugin + + + src/main/resources/assembly.xml + + + + com.heibaiying.wordcount.ClusterWordCountApp + + + + + + +``` + +assembly.xml文件内容如下: + +```xml + + + jar-with-dependencies + + + + jar + + + false + + + / + true + true + runtime + + + org.apache.storm:storm-core + + + + +``` + +>在配置文件中不仅可以排除依赖,还可以排除指定的文件,更多的配置规则可以参考官方文档:[Descriptor Format](http://maven.apache.org/plugins/maven-assembly-plugin/assembly.html#) + +#### 2. 打包命令 + +采用maven-assembly-plugin进行打包时命令如下: + +```shell +# mvn assembly:assembly +``` + +打包后会同时生成两个JAR包,其中后缀为`jar-with-dependencies`是含有第三方依赖的JAR包,后缀是由`assembly.xml`中``标签指定的,可以自定义修改。提交该JAR到集群环境即可直接使用。 + +![storm-jar](D:\BigData-Notes\pictures\storm-jar.png) + + + +## 四、maven-shade-plugin插件 + +### 4.1 官方文档说明 + +第三种方式是使用maven-shade-plugin,既然已经有了maven-assembly-plugin,为什么还需要maven-shade-plugin,这一点在官方文档中也是有所说明的,来自于官方对HDFS整合讲解的章节[Storm HDFS Integration](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-hdfs.html),原文如下: + +>When packaging your topology, it's important that you use the [maven-shade-plugin](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-hdfs.html) as opposed to the [maven-assembly-plugin](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-hdfs.html). +> +>The shade plugin provides facilities for merging JAR manifest entries, which the hadoop client leverages for URL scheme resolution. +> +>If you experience errors such as the following: +> +>``` +>java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs +>``` +> +>it's an indication that your topology jar file isn't packaged properly. +> +>If you are using maven to create your topology jar, you should use the following `maven-shade-plugin` configuration to create your topology jar。 + +这里第一句就说的比较清晰,在集成HDFS时候,你必须使用maven-shade-plugin来代替maven-assembly-plugin,否则会抛出 + +RuntimeException异常。 + +采用maven-shade-plugin有很多好处,比如你的工程依赖很多的JAR包,而被依赖的JAR又会依赖其他的JAR包,这样,当工程中依赖到不同的版本的 JAR时,并且JAR中具有相同名称的资源文件时,shade插件会尝试将所有资源文件打包在一起时,而不是和assembly一样执行覆盖操作。 + +### 4.2 配置 + +配置示例如下: + +```xml + + org.apache.maven.plugins + maven-shade-plugin + + true + + + *:* + + META-INF/*.SF + META-INF/*.sf + META-INF/*.DSA + META-INF/*.dsa + META-INF/*.RSA + META-INF/*.rsa + META-INF/*.EC + META-INF/*.ec + META-INF/MSFTSIG.SF + META-INF/MSFTSIG.RSA + + + + + + org.apache.storm:storm-core + + + + + + package + + shade + + + + + + + + + + + +``` + +配置说明: + +有些jar包生成时,会使用jarsigner生成文件签名(完成性校验),分为两个文件存放在META-INF目录下。 + ++ a signature file, with a .SF extension; ++ a signature block file, with a .DSA, .RSA, or .EC extension; + +如果某些包的存在重复引用,这可能会导致在打包时候出现`Invalid signature file digest for Manifest main attributes`异常,所以在配置中排除这些文件。 + +### 4.3 打包命令 + +使用maven-shade-plugin进行打包的时候,打包命令和普通的一样: + +```shell +# mvn package +``` + +打包后会生成两个JAR包,提交到服务器集群时使用非original开头的JAR. + +![storm-jar2](D:\BigData-Notes\pictures\storm-jar2.png) + +## 五、结论 + +通过以上三种打包方式的详细介绍,这里给出最后的结论:**建议使用maven-shade-plugin插件进行打包**,因为其通用性最强,操作最简单,并且storm官方Github中给的所有[examples](https://github.com/apache/storm/tree/master/examples)都是采用的第三种打包方式。 + + + +## 六、打包注意事项 + +无论采用任何打包方式,都必须排除集群环境中已经提供的storm jars。这里比较典型的是storm-core,其在安装目录的lib目录下已经存在。 + +![storm-lib](D:\BigData-Notes\pictures\storm-lib.png) + + + +如果你不排除storm-core,通常会抛出下面的异常: + +```properties +Caused by: java.lang.RuntimeException: java.io.IOException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/usr/app/apache-storm-1.2.2/lib/storm-core-1.2.2.jar!/defaults.yaml, jar:file:/usr/appjar/storm-hdfs-integration-1.0.jar!/defaults.yaml] + at org.apache.storm.utils.Utils.findAndReadConfigFile(Utils.java:384) + at org.apache.storm.utils.Utils.readDefaultConfig(Utils.java:428) + at org.apache.storm.utils.Utils.readStormConfig(Utils.java:464) + at org.apache.storm.utils.Utils.(Utils.java:178) + ... 39 more +``` + +![storm-jar-complie-error](D:\BigData-Notes\pictures\storm-jar-complie-error.png) \ No newline at end of file diff --git a/notes/Storm编程模型详解.md b/notes/Storm编程模型详解.md index 0b74a3a..3c01401 100644 --- a/notes/Storm编程模型详解.md +++ b/notes/Storm编程模型详解.md @@ -471,7 +471,7 @@ storm kill ClusterWordCountApp -w 3 ### 1. mvn package的局限性 -上面我们直接使用`mvn package`进行项目打包,这对于没有使用外部依赖包的项目是可行的。但如果项目中使用了第三方JAR包,就会出现问题,因为`package`打包后的JAR中是不含有依赖包的,如果此时你提交到服务器上运行,就会出现找不到第三方依赖的异常。 +上面我们没有在POM中配置任何插件,直接使用`mvn package`进行项目打包,这对于没有使用外部依赖包的项目是可行的。但如果项目中使用了第三方JAR包,就会出现问题,因为`package`打包后的JAR中是不含有依赖包的,如果此时你提交到服务器上运行,就会出现找不到第三方依赖的异常。 这时候可能大家会有疑惑,在我们的项目中不是使用了`storm-core`这个依赖吗?其实上面之所以我们能运行成功,是因为在Storm的集群环境中提供了这个JAR包,在安装目录的lib目录下: diff --git a/notes/Storm集成HBase和HDFS.md b/notes/Storm集成HBase和HDFS.md new file mode 100644 index 0000000..d424740 --- /dev/null +++ b/notes/Storm集成HBase和HDFS.md @@ -0,0 +1,472 @@ +# Storm集成HDFS和HBase + +## 一、Storm集成HDFS + +### 1.1 项目结构 + +> 本用例源码下载地址:[storm-hdfs-integration](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-hdfs-integration) + +### 1.2 项目主要依赖 + +项目主要依赖如下,有两个需要注意: + ++ 这里由于我服务器上安装的是CDH版本的Hadoop,在导入依赖时引入的也是CDH版本的依赖,需要使用``标签指定CDH的仓库地址; ++ `hadoop-common`、`hadoop-client`、`hadoop-hdfs`均需要排除`slf4j-log4j12`依赖,原因是`storm-core`中已经有该依赖,不排除的话有JAR包冲突的风险; + +```xml + + 1.2.2 + + + + + cloudera + https://repository.cloudera.com/artifactory/cloudera-repos/ + + + + + + org.apache.storm + storm-core + ${storm.version} + + + + org.apache.storm + storm-hdfs + ${storm.version} + + + org.apache.hadoop + hadoop-common + 2.6.0-cdh5.15.2 + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.hadoop + hadoop-client + 2.6.0-cdh5.15.2 + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.hadoop + hadoop-hdfs + 2.6.0-cdh5.15.2 + + + org.slf4j + slf4j-log4j12 + + + + +``` + +### 1.3 DataSourceSpout + +```java +/** + * 产生词频样本的数据源 + */ +public class DataSourceSpout extends BaseRichSpout { + + private List list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); + + private SpoutOutputCollector spoutOutputCollector; + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + this.spoutOutputCollector = spoutOutputCollector; + } + + @Override + public void nextTuple() { + // 模拟产生数据 + String lineData = productData(); + spoutOutputCollector.emit(new Values(lineData)); + Utils.sleep(1000); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("line")); + } + + + /** + * 模拟数据 + */ + private String productData() { + Collections.shuffle(list); + Random random = new Random(); + int endIndex = random.nextInt(list.size()) % (list.size()) + 1; + return StringUtils.join(list.toArray(), "\t", 0, endIndex); + } + +} +``` + +产生的模拟数据格式如下: + +```properties +Spark HBase +Hive Flink Storm Hadoop HBase Spark +Flink +HBase Storm +HBase Hadoop Hive Flink +HBase Flink Hive Storm +Hive Flink Hadoop +HBase Hive +Hadoop Spark HBase Storm +``` + +### 1.4 将数据存储到HDFS + +这里HDFS的地址和数据存储路径均使用了硬编码,在实际开发中可以通过外部传参指定,这样程序更为灵活。 + +```java +public class DataToHdfsApp { + + private static final String DATA_SOURCE_SPOUT = "dataSourceSpout"; + private static final String HDFS_BOLT = "hdfsBolt"; + + public static void main(String[] args) { + + // 指定Hadoop的用户名 如果不指定,则在HDFS创建目录时候有可能抛出无权限的异常(RemoteException: Permission denied) + System.setProperty("HADOOP_USER_NAME", "root"); + + // 定义输出字段(Field)之间的分隔符 + RecordFormat format = new DelimitedRecordFormat() + .withFieldDelimiter("|"); + + // 同步策略: 每100个tuples之后就会把数据从缓存刷新到HDFS中 + SyncPolicy syncPolicy = new CountSyncPolicy(100); + + // 文件策略: 每个文件大小上限1M,超过限定时,创建新文件并继续写入 + FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB); + + // 定义存储路径 + FileNameFormat fileNameFormat = new DefaultFileNameFormat() + .withPath("/storm-hdfs/"); + + // 定义HdfsBolt + HdfsBolt hdfsBolt = new HdfsBolt() + .withFsUrl("hdfs://hadoop001:8020") + .withFileNameFormat(fileNameFormat) + .withRecordFormat(format) + .withRotationPolicy(rotationPolicy) + .withSyncPolicy(syncPolicy); + + + // 构建Topology + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout()); + // save to HDFS + builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT); + + + // 如果外部传参cluster则代表线上环境启动,否则代表本地启动 + if (args.length > 0 && args[0].equals("cluster")) { + try { + StormSubmitter.submitTopology("ClusterDataToHdfsApp", new Config(), builder.createTopology()); + } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { + e.printStackTrace(); + } + } else { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("LocalDataToHdfsApp", + new Config(), builder.createTopology()); + } + } +} +``` + +### 1.5 启动测试 + +可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用`maven-shade-plugin`进行打包,打包命令如下: + +```shell +# mvn clean package -D maven.test.skip=true +``` + +运行后,数据会存储到HDFS的`/storm-hdfs`目录下。使用以下命令可以查看目录内容: + +```shell +# 查看目录内容 +hadoop fs -ls /storm-hdfs +# 监听文内容变化 +hadoop fs -tail -f /strom-hdfs/文件名 +``` + + + +![storm-hdfs-result](D:\BigData-Notes\pictures\storm-hdfs-result.png) + + + +## 二、Storm集成HBase + +### 2.1 项目结构 + +本用例源码下载地址:[storm-hbase-integration](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-hbase-integration) + +### 2.2 项目主要依赖 + +```xml + + 1.2.2 + + + + + + org.apache.storm + storm-core + ${storm.version} + + + + org.apache.storm + storm-hbase + ${storm.version} + + +``` + +### 2.3 DataSourceSpout + +```java +/** + * 产生词频样本的数据源 + */ +public class DataSourceSpout extends BaseRichSpout { + + private List list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); + + private SpoutOutputCollector spoutOutputCollector; + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + this.spoutOutputCollector = spoutOutputCollector; + } + + @Override + public void nextTuple() { + // 模拟产生数据 + String lineData = productData(); + spoutOutputCollector.emit(new Values(lineData)); + Utils.sleep(1000); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("line")); + } + + + /** + * 模拟数据 + */ + private String productData() { + Collections.shuffle(list); + Random random = new Random(); + int endIndex = random.nextInt(list.size()) % (list.size()) + 1; + return StringUtils.join(list.toArray(), "\t", 0, endIndex); + } + +} +``` + +产生的模拟数据格式如下: + +```properties +Spark HBase +Hive Flink Storm Hadoop HBase Spark +Flink +HBase Storm +HBase Hadoop Hive Flink +HBase Flink Hive Storm +Hive Flink Hadoop +HBase Hive +Hadoop Spark HBase Storm +``` + + + +### 2.4 SplitBolt + +```java +/** + * 将每行数据按照指定分隔符进行拆分 + */ +public class SplitBolt extends BaseRichBolt { + + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + String line = input.getStringByField("line"); + String[] words = line.split("\t"); + for (String word : words) { + collector.emit(tuple(word, 1)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } +} +``` + +### 2.5 CountBolt + +```java +/** + * 进行词频统计 + */ +public class CountBolt extends BaseRichBolt { + + private Map counts = new HashMap<>(); + + private OutputCollector collector; + + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector=collector; + } + + @Override + public void execute(Tuple input) { + String word = input.getStringByField("word"); + Integer count = counts.get(word); + if (count == null) { + count = 0; + } + count++; + counts.put(word, count); + // 输出 + collector.emit(new Values(word, String.valueOf(count))); + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } +} +``` + +### 2.6 WordCountToHBaseApp + +```java +/** + * 进行词频统计 并将统计结果存储到HBase中 + */ +public class WordCountToHBaseApp { + + private static final String DATA_SOURCE_SPOUT = "dataSourceSpout"; + private static final String SPLIT_BOLT = "splitBolt"; + private static final String COUNT_BOLT = "countBolt"; + private static final String HBASE_BOLT = "hbaseBolt"; + + public static void main(String[] args) { + + // storm的配置 + Config config = new Config(); + + // HBase的配置 + Map hbConf = new HashMap<>(); + hbConf.put("hbase.rootdir", "hdfs://hadoop001:8020/hbase"); + hbConf.put("hbase.zookeeper.quorum", "hadoop001:2181"); + + // 将HBase的配置传入Storm的配置中 + config.put("hbase.conf", hbConf); + + // 定义流数据与HBase中数据的映射 + SimpleHBaseMapper mapper = new SimpleHBaseMapper() + .withRowKeyField("word") + .withColumnFields(new Fields("word","count")) + .withColumnFamily("info"); + + /* + * 给HBaseBolt传入表名、数据映射关系、和HBase的配置信息 + * 表需要预先创建: create 'WordCount','info' + */ + HBaseBolt hbase = new HBaseBolt("WordCount", mapper) + .withConfigKey("hbase.conf"); + + // 构建Topology + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1); + // split + builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT); + // count + builder.setBolt(COUNT_BOLT, new CountBolt(),1).shuffleGrouping(SPLIT_BOLT); + // save to HBase + builder.setBolt(HBASE_BOLT, hbase, 1).shuffleGrouping(COUNT_BOLT); + + + // 如果外部传参cluster则代表线上环境启动,否则代表本地启动 + if (args.length > 0 && args[0].equals("cluster")) { + try { + StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology()); + } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { + e.printStackTrace(); + } + } else { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("LocalWordCountToRedisApp", + config, builder.createTopology()); + } + } +} +``` + +### 2.7 启动测试 + +可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用`maven-shade-plugin`进行打包,打包命令如下: + +```shell +# mvn clean package -D maven.test.skip=true +``` + +运行后,数据会存储到HBase的`WordCount`表中。使用以下命令查看表的内容: + +```shell +hbase > scan 'WordCount' +``` + +![storm-hbase-result](D:\BigData-Notes\pictures\storm-hbase-result.png) + + + +### 2.8 withCounterFields + +在上面的用例中我们是手动编码来实现词频统计,并将最后的结果存储到HBase中。其实也可以在构建`SimpleHBaseMapper`的时候通过`withCounterFields`指定count字段,被指定的字段会自动进行累加操作,这样也可以实现词频统计。需要注意的是withCounterFields指定的字段必须是Long类型,不能是String类型。 + +```java +SimpleHBaseMapper mapper = new SimpleHBaseMapper() + .withRowKeyField("word") + .withColumnFields(new Fields("word")) + .withCounterFields(new Fields("count")) + .withColumnFamily("cf"); +``` + diff --git a/notes/Storm集成Kakfa.md b/notes/Storm集成Kakfa.md new file mode 100644 index 0000000..20d7ed0 --- /dev/null +++ b/notes/Storm集成Kakfa.md @@ -0,0 +1,344 @@ +# Storm集成Kafka + +## 一、整合说明 + +Storm官方对Kafka的整合分为两个版本,官方说明文档分别如下: + ++ [Storm Kafka Integration](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka.html) : 主要是针对0.8.x版本的Kafka提供整合支持; ++ [Storm Kafka Integration (0.10.x+)]() : 包含Kafka 新版本的 consumer API,主要对Kafka 0.10.x +提供整合支持。 + +这里我服务端安装的Kafka版本为2.2.0(Released Mar 22, 2019) ,按照官方0.10.x+的整合文档进行整合,不适用于0.8.x版本的Kafka。 + +## 二、写入数据到Kafka + +### 2.1 项目结构 + + + +### 2.2 项目主要依赖 + +```xml + + 1.2.2 + 2.2.0 + + + + + org.apache.storm + storm-core + ${storm.version} + + + org.apache.storm + storm-kafka-client + ${storm.version} + + + org.apache.kafka + kafka-clients + ${kafka.version} + + +``` + +### 2.3 DataSourceSpout + +```java +/** + * 产生词频样本的数据源 + */ +public class DataSourceSpout extends BaseRichSpout { + + private List list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); + + private SpoutOutputCollector spoutOutputCollector; + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + this.spoutOutputCollector = spoutOutputCollector; + } + + @Override + public void nextTuple() { + // 模拟产生数据 + String lineData = productData(); + spoutOutputCollector.emit(new Values(lineData)); + Utils.sleep(1000); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("line")); + } + + + /** + * 模拟数据 + */ + private String productData() { + Collections.shuffle(list); + Random random = new Random(); + int endIndex = random.nextInt(list.size()) % (list.size()) + 1; + return StringUtils.join(list.toArray(), "\t", 0, endIndex); + } + +} +``` + +产生的模拟数据格式如下: + +```properties +Spark HBase +Hive Flink Storm Hadoop HBase Spark +Flink +HBase Storm +HBase Hadoop Hive Flink +HBase Flink Hive Storm +Hive Flink Hadoop +HBase Hive +Hadoop Spark HBase Storm +``` + +### 2.4 WritingToKafkaApp + +```java +/** + * 写入数据到Kafka中 + */ +public class WritingToKafkaApp { + + private static final String BOOTSTRAP_SERVERS = "hadoop001:9092"; + private static final String TOPIC_NAME = "storm-topic"; + + public static void main(String[] args) { + + + TopologyBuilder builder = new TopologyBuilder(); + + // 定义Kafka生产者属性 + Properties props = new Properties(); + /* + * 指定broker的地址清单,清单里不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息。 + * 不过建议至少要提供两个broker的信息作为容错。 + */ + props.put("bootstrap.servers", BOOTSTRAP_SERVERS); + /* + * acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。 + * acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 + * acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 + * acks=all : 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 + */ + props.put("acks", "1"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + KafkaBolt bolt = new KafkaBolt() + .withProducerProperties(props) + .withTopicSelector(new DefaultTopicSelector(TOPIC_NAME)) + .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>()); + + builder.setSpout("sourceSpout", new DataSourceSpout(), 1); + builder.setBolt("kafkaBolt", bolt, 1).shuffleGrouping("sourceSpout"); + + + if (args.length > 0 && args[0].equals("cluster")) { + try { + StormSubmitter.submitTopology("ClusterWritingToKafkaApp", new Config(), builder.createTopology()); + } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { + e.printStackTrace(); + } + } else { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("LocalWritingToKafkaApp", + new Config(), builder.createTopology()); + } + } +} +``` + +### 2.5 测试准备工作 + +进行测试前需要启动Kakfa。 + +#### 1. 启动Kakfa + +Kafka的运行依赖于zookeeper,需要预先启动,可以启动Kafka内置的zookeeper,也可以启动自己安装的。 + +```shell +# zookeeper启动命令 +bin/zkServer.sh start + +# 内置zookeeper启动命令 +bin/zookeeper-server-start.sh config/zookeeper.properties +``` + +启动单节点kafka: + +```shell +# bin/kafka-server-start.sh config/server.properties +``` + +#### 2. 创建topic + +```shell +# 创建用于测试主题 +bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 1 --partitions 1 --topic storm-topic + +# 查看所有主题 + bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092 +``` + +#### 3. 启动一个消费者用于观察写入情况 + +```shell +# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic storm-topic --from-beginning +``` + +### 2.6 测试 + +可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用`maven-shade-plugin`进行打包,打包命令如下: + +```shell +# mvn clean package -D maven.test.skip=true +``` + +启动后,消费者监听情况如下: + +![strom-kafka-consumer](D:\BigData-Notes\pictures\strom-kafka-consumer.png) + + + +## 三、从Kafka中读取数据 + +### 3.1 项目结构 + + + +### 3.2 ReadingFromKafkaApp + +```java +/** + * 从Kafka中读取数据 + */ +public class ReadingFromKafkaApp { + + private static final String BOOTSTRAP_SERVERS = "hadoop001:9092"; + private static final String TOPIC_NAME = "storm-topic"; + + public static void main(String[] args) { + + final TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(BOOTSTRAP_SERVERS, TOPIC_NAME)), 1); + builder.setBolt("bolt", new LogConsoleBolt()).shuffleGrouping("kafka_spout"); + + // 如果外部传参cluster则代表线上环境启动,否则代表本地启动 + if (args.length > 0 && args[0].equals("cluster")) { + try { + StormSubmitter.submitTopology("ClusterReadingFromKafkaApp", new Config(), builder.createTopology()); + } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { + e.printStackTrace(); + } + } else { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("LocalReadingFromKafkaApp", + new Config(), builder.createTopology()); + } + } + + private static KafkaSpoutConfig getKafkaSpoutConfig(String bootstrapServers, String topic) { + return KafkaSpoutConfig.builder(bootstrapServers, topic) + // 除了分组ID,以下配置都是可选的。分组ID必须指定,否则会抛出InvalidGroupIdException异常 + .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") + // 定义重试策略 + .setRetry(getRetryService()) + // 定时提交偏移量的时间间隔,默认是15s + .setOffsetCommitPeriodMs(10_000) + .build(); + } + + // 定义重试策略 + private static KafkaSpoutRetryService getRetryService() { + return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500), + TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); + } +} + +``` + +### 3.3 LogConsoleBolt + +```java +/** + * 打印从Kafka中获取的数据 + */ +public class LogConsoleBolt extends BaseRichBolt { + + + private OutputCollector collector; + + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector=collector; + } + + public void execute(Tuple input) { + try { + String value = input.getStringByField("value"); + System.out.println("received from kafka : "+ value); + // 必须ack,否则会重复消费kafka中的消息 + collector.ack(input); + }catch (Exception e){ + e.printStackTrace(); + collector.fail(input); + } + + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} +``` + +这里从`value`字段中获取kafka输出的值数据。 + +默认情况下,我们可以通过继承`RecordTranslator`接口定义了Kafka中Record与输出流之间的转换关系,可以在构建`KafkaSpoutConfig`的时候通过构造器或者`setRecordTranslator()`传入,并最后传递给具体的`KafkaSpout`。如果不指定的情况下,则默认使用内置的`DefaultRecordTranslator`,其源码如下,`FIELDS`中 定义了tuple中所有可用的字段: + +```java +public class DefaultRecordTranslator implements RecordTranslator { + private static final long serialVersionUID = -5782462870112305750L; + public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value"); + @Override + public List apply(ConsumerRecord record) { + return new Values(record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + + @Override + public Fields getFieldsFor(String stream) { + return FIELDS; + } + + @Override + public List streams() { + return DEFAULT_STREAM; + } +} +``` + +### 3.4 启动测试 + +这里启动一个生产者用于发送测试数据,启动命令如下: + +```shell +# bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic storm-topic +``` + +![storm-kafka-producer](D:\BigData-Notes\pictures\storm-kafka-producer.png) + +本地运行的项目接收到从Kafka发送过来的数据: + +![storm-kafka-receiver](D:\BigData-Notes\pictures\storm-kafka-receiver.png) \ No newline at end of file diff --git a/notes/Storm集成Redis详解.md b/notes/Storm集成Redis详解.md new file mode 100644 index 0000000..9a9516c --- /dev/null +++ b/notes/Storm集成Redis详解.md @@ -0,0 +1,625 @@ +# Storm 集成 Redis 详解 + +## 一、简介 + +storm-redis提供了Storm与Redis的集成支持,你只需要引入对应的依赖即可使用。Storm-redis使用Jedis为Redis客户端,提供了基本的Bolt实现, `RedisLookupBolt` and `RedisStoreBolt`。 + ++ RedisLookupBolt:从Redis中查询数据; ++ RedisStoreBolt:存储数据到Redis; ++ RedisFilterBolt : 查询符合条件的数据; + +`RedisLookupBolt`和`RedisStoreBolt`都继承自`AbstractRedisBolt`抽象类,我们也可以继承自该抽象类,然后按照我们自己的的业务逻辑进行功能的拓展。 + +```xml + + org.apache.storm + storm-redis + ${storm.version} + jar + +``` + +## 二、集成案例 + +### 2.1 项目结构 + +### 2.2 项目依赖 + +项目主要依赖如下: + +```xml + + 1.2.2 + + + + + org.apache.storm + storm-core + ${storm.version} + + + org.apache.storm + storm-redis + ${storm.version} + + +``` + +### 2.3 DataSourceSpout + +```java +/** + * 产生词频样本的数据源 + */ +public class DataSourceSpout extends BaseRichSpout { + + private List list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); + + private SpoutOutputCollector spoutOutputCollector; + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + this.spoutOutputCollector = spoutOutputCollector; + } + + @Override + public void nextTuple() { + // 模拟产生数据 + String lineData = productData(); + spoutOutputCollector.emit(new Values(lineData)); + Utils.sleep(1000); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("line")); + } + + + /** + * 模拟数据 + */ + private String productData() { + Collections.shuffle(list); + Random random = new Random(); + int endIndex = random.nextInt(list.size()) % (list.size()) + 1; + return StringUtils.join(list.toArray(), "\t", 0, endIndex); + } + +} +``` + +产生的模拟数据格式如下: + +```properties +Spark HBase +Hive Flink Storm Hadoop HBase Spark +Flink +HBase Storm +HBase Hadoop Hive Flink +HBase Flink Hive Storm +Hive Flink Hadoop +HBase Hive +Hadoop Spark HBase Storm +``` + +### 2.4 SplitBolt + +```java +/** + * 将每行数据按照指定分隔符进行拆分 + */ +public class SplitBolt extends BaseRichBolt { + + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + String line = input.getStringByField("line"); + String[] words = line.split("\t"); + for (String word : words) { + collector.emit(new Values(word, String.valueOf(1))); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } +} +``` + +### 2.5 CountBolt + +```java +/** + * 进行词频统计 + */ +public class CountBolt extends BaseRichBolt { + + private Map counts = new HashMap<>(); + + private OutputCollector collector; + + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector=collector; + } + + @Override + public void execute(Tuple input) { + String word = input.getStringByField("word"); + Integer count = counts.get(word); + if (count == null) { + count = 0; + } + count++; + counts.put(word, count); + // 输出 + collector.emit(new Values(word, String.valueOf(count))); + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } +} +``` + +### 2.6 WordCountStoreMapper + +实现RedisStoreMapper,并定义定义tuple与Redis中数据的映射关系,Redis存储的是Key/Value键值对,并且支持多种数据结构,你需要指定tuple中的那个字段为key,那个字段为value,并且存储为什么数据结构。 + +```java +/** + * 定义tuple与Redis中数据的映射关系 + */ +public class WordCountStoreMapper implements RedisStoreMapper { + private RedisDataTypeDescription description; + private final String hashKey = "wordCount"; + + public WordCountStoreMapper() { + description = new RedisDataTypeDescription( + RedisDataTypeDescription.RedisDataType.HASH, hashKey); + } + + @Override + public RedisDataTypeDescription getDataTypeDescription() { + return description; + } + + @Override + public String getKeyFromTuple(ITuple tuple) { + return tuple.getStringByField("word"); + } + + @Override + public String getValueFromTuple(ITuple tuple) { + return tuple.getStringByField("count"); + } +} +``` + +### 2.7 WordCountToRedisApp + +```java +/** + * 进行词频统计 并将统计结果存储到Redis中 + */ +public class WordCountToRedisApp { + + private static final String DATA_SOURCE_SPOUT = "dataSourceSpout"; + private static final String SPLIT_BOLT = "splitBolt"; + private static final String COUNT_BOLT = "countBolt"; + private static final String STORE_BOLT = "storeBolt"; + + //在实际开发中这些参数可以将通过外部传入 使得程序更加灵活 + private static final String REDIS_HOST = "192.168.200.226"; + private static final int REDIS_PORT = 6379; + + public static void main(String[] args) { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout()); + // split + builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT); + // count + builder.setBolt(COUNT_BOLT, new CountBolt()).shuffleGrouping(SPLIT_BOLT); + // save to redis + JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() + .setHost(REDIS_HOST).setPort(REDIS_PORT).build(); + RedisStoreMapper storeMapper = new WordCountStoreMapper(); + RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper); + builder.setBolt(STORE_BOLT, storeBolt).shuffleGrouping(COUNT_BOLT); + + // 如果外部传参cluster则代表线上环境启动否则代表本地启动 + if (args.length > 0 && args[0].equals("cluster")) { + try { + StormSubmitter.submitTopology("ClusterWordCountToRedisApp", new Config(), builder.createTopology()); + } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { + e.printStackTrace(); + } + } else { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("LocalWordCountToRedisApp", + new Config(), builder.createTopology()); + } + } +} +``` + +### 2.8 启动测试 + +可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用`maven-shade-plugin`进行打包,打包命令如下: + +```shell +# mvn clean package -D maven.test.skip=true +``` + +启动后,查看Redis中的数据: + +![store-redis-manager](D:\BigData-Notes\pictures\store-redis-manager.png) + + + +## 三、storm-redis 实现原理 + +### 3.1 AbstractRedisBolt + +`RedisLookupBolt`和`RedisStoreBolt`都继承自`AbstractRedisBolt`抽象类,和我们自定义实现Bolt一样,`AbstractRedisBolt`间接继承自`BaseRichBolt`。 + + + +![storm-abstractRedisBolt](D:\BigData-Notes\pictures\storm-abstractRedisBolt.png) + +`AbstractRedisBolt`中比较重要的是prepare方法,在该方法中通过外部传入的jedis连接池配置( jedisPoolConfig/jedisClusterConfig) 创建用于管理Jedis实例的容器`JedisCommandsInstanceContainer`。 + +```java +public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt { + protected OutputCollector collector; + + private transient JedisCommandsInstanceContainer container; + + private JedisPoolConfig jedisPoolConfig; + private JedisClusterConfig jedisClusterConfig; + + ...... + + @Override + public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { + // FIXME: stores map (stormConf), topologyContext and expose these to derived classes + this.collector = collector; + + if (jedisPoolConfig != null) { + this.container = JedisCommandsContainerBuilder.build(jedisPoolConfig); + } else if (jedisClusterConfig != null) { + this.container = JedisCommandsContainerBuilder.build(jedisClusterConfig); + } else { + throw new IllegalArgumentException("Jedis configuration not found"); + } + } + + ....... +} +``` + +`JedisCommandsInstanceContainer`的`build()`方法如下,实际上就是创建JedisPool或JedisCluster并传入容器中。 + +```java +public static JedisCommandsInstanceContainer build(JedisPoolConfig config) { + JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase()); + return new JedisContainer(jedisPool); + } + + public static JedisCommandsInstanceContainer build(JedisClusterConfig config) { + JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getTimeout(), config.getMaxRedirections(), config.getPassword(), DEFAULT_POOL_CONFIG); + return new JedisClusterContainer(jedisCluster); + } +``` + +### 3.2 RedisStoreBolt和RedisLookupBolt + +`RedisStoreBolt`中比较重要的是process方法,该方法主要从storeMapper中获取传入key/value的值,并按照其存储类型`dataType`调用jedisCommand的对应方法进行存储。 + +RedisLookupBolt 的实现基本类似,从lookupMapper中获取传入的key值,并进行查询操作。 + +```java +public class RedisStoreBolt extends AbstractRedisBolt { + private final RedisStoreMapper storeMapper; + private final RedisDataTypeDescription.RedisDataType dataType; + private final String additionalKey; + + public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) { + super(config); + this.storeMapper = storeMapper; + + RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription(); + this.dataType = dataTypeDescription.getDataType(); + this.additionalKey = dataTypeDescription.getAdditionalKey(); + } + + public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) { + super(config); + this.storeMapper = storeMapper; + + RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription(); + this.dataType = dataTypeDescription.getDataType(); + this.additionalKey = dataTypeDescription.getAdditionalKey(); + } + + + @Override + public void process(Tuple input) { + String key = storeMapper.getKeyFromTuple(input); + String value = storeMapper.getValueFromTuple(input); + + JedisCommands jedisCommand = null; + try { + jedisCommand = getInstance(); + + switch (dataType) { + case STRING: + jedisCommand.set(key, value); + break; + + case LIST: + jedisCommand.rpush(key, value); + break; + + case HASH: + jedisCommand.hset(additionalKey, key, value); + break; + + case SET: + jedisCommand.sadd(key, value); + break; + + case SORTED_SET: + jedisCommand.zadd(additionalKey, Double.valueOf(value), key); + break; + + case HYPER_LOG_LOG: + jedisCommand.pfadd(key, value); + break; + + case GEO: + String[] array = value.split(":"); + if (array.length != 2) { + throw new IllegalArgumentException("value structure should be longitude:latitude"); + } + + double longitude = Double.valueOf(array[0]); + double latitude = Double.valueOf(array[1]); + jedisCommand.geoadd(additionalKey, longitude, latitude, key); + break; + + default: + throw new IllegalArgumentException("Cannot process such data type: " + dataType); + } + + collector.ack(input); + } catch (Exception e) { + this.collector.reportError(e); + this.collector.fail(input); + } finally { + returnInstance(jedisCommand); + } + } + + ......... +} + +``` + +### 3.3 JedisCommands + +JedisCommands接口中定义了所有的 Redis 客户端命令,它有以下三个实现类,分别是Jedis、JedisCluster、ShardedJedis。Strom中主要使用前两种实现类,具体调用哪一个实现类来执行命令,由传入的是jedisPoolConfig还是jedisClusterConfig来决定。 + +![storm-jedicCommands](D:\BigData-Notes\pictures\storm-jedicCommands.png) + +### 3.4 RedisMapper 和 TupleMapper + +RedisMapper 和 TupleMapper 定义了 tuple 和 Redis 中的数据如何进行映射转换。 + +![storm-Redis-Mapper](D:\BigData-Notes\pictures\storm-Redis-Mapper.png) + +#### 1. TupleMapper + +TupleMapper 主要定义了两个方法: + ++ getKeyFromTuple(ITuple tuple): 从tuple中获取那个字段作为Key; + ++ getValueFromTuple(ITuple tuple):从tuple中获取那个字段作为Value; + +#### 2. RedisMapper + +定义了获取数据类型的方法`getDataTypeDescription()`,RedisDataTypeDescription中RedisDataType枚举类定义了所有可用的Redis数据类型: + +```java +public class RedisDataTypeDescription implements Serializable { + + public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG, GEO } + ...... + } +``` + +#### 3. RedisStoreMapper + +RedisStoreMapper继承TupleMapper和RedisMapper接口,用于存储数据时,没有定义额外方法。 + +#### 4. RedisLookupMapper + +RedisLookupMapper继承TupleMapper和RedisMapper接口, + ++ 定义了declareOutputFields方法,声明输出的字段。 ++ 定义了toTuple方法,将查询结果组装为Storm的Values的集合,并用于发送。 + +下面的例子表示从输入`Tuple`的获取`word`字段作为key,使用`RedisLookupBolt`进行查询后,将key和查询结果value组装为values并发送到下一个处理单元。 + +```java +class WordCountRedisLookupMapper implements RedisLookupMapper { + private RedisDataTypeDescription description; + private final String hashKey = "wordCount"; + + public WordCountRedisLookupMapper() { + description = new RedisDataTypeDescription( + RedisDataTypeDescription.RedisDataType.HASH, hashKey); + } + + @Override + public List toTuple(ITuple input, Object value) { + String member = getKeyFromTuple(input); + List values = Lists.newArrayList(); + values.add(new Values(member, value)); + return values; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("wordName", "count")); + } + + @Override + public RedisDataTypeDescription getDataTypeDescription() { + return description; + } + + @Override + public String getKeyFromTuple(ITuple tuple) { + return tuple.getStringByField("word"); + } + + @Override + public String getValueFromTuple(ITuple tuple) { + return null; + } +} +``` + +#### 5. RedisFilterMapper + +RedisFilterMapper继承TupleMapper和RedisMapper接口,用于查询数据时,定义了declareOutputFields方法,声明输出的字段。如下面的实现: + +```java +@Override +public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("wordName", "count")); +} + +``` + +## 四、自定义RedisBolt实现词频统计 + +自定义RedisBolt:利用Redis中哈希结构的hincrby key field命令进行词频统计。在Redis中`hincrby`的执行效果如下,如果在执行时字段不存在,则在执行操作之前将值设置为0。通过这个命令可以非常轻松的实现词频统计功能。 + +```shell +redis> HSET myhash field 5 +(integer) 1 +redis> HINCRBY myhash field 1 +(integer) 6 +redis> HINCRBY myhash field -1 +(integer) 5 +redis> HINCRBY myhash field -10 +(integer) -5 +redis> +``` + +### 4.1 自定义RedisBolt的代码实现 + +```java +/** + * 自定义RedisBolt 利用Redis的哈希数据结构的hincrby key field命令进行词频统计 + */ +public class RedisCountStoreBolt extends AbstractRedisBolt { + + private final RedisStoreMapper storeMapper; + private final RedisDataTypeDescription.RedisDataType dataType; + private final String additionalKey; + + public RedisCountStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) { + super(config); + this.storeMapper = storeMapper; + RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription(); + this.dataType = dataTypeDescription.getDataType(); + this.additionalKey = dataTypeDescription.getAdditionalKey(); + } + + @Override + protected void process(Tuple tuple) { + String key = storeMapper.getKeyFromTuple(tuple); + String value = storeMapper.getValueFromTuple(tuple); + + JedisCommands jedisCommand = null; + try { + jedisCommand = getInstance(); + if (dataType == RedisDataTypeDescription.RedisDataType.HASH) { + jedisCommand.hincrBy(additionalKey, key, Long.valueOf(value)); + } else { + throw new IllegalArgumentException("Cannot process such data type for Count: " + dataType); + } + + collector.ack(tuple); + } catch (Exception e) { + this.collector.reportError(e); + this.collector.fail(tuple); + } finally { + returnInstance(jedisCommand); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} +``` + +### 4.2 CustomRedisCountApp + +```java +/** + * 利用自定义的RedisBolt实现词频统计 + */ +public class CustomRedisCountApp { + + private static final String DATA_SOURCE_SPOUT = "dataSourceSpout"; + private static final String SPLIT_BOLT = "splitBolt"; + private static final String STORE_BOLT = "storeBolt"; + + private static final String REDIS_HOST = "192.168.200.226"; + private static final int REDIS_PORT = 6379; + + public static void main(String[] args) { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout()); + // split + builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT); + // save to redis and count + JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() + .setHost(REDIS_HOST).setPort(REDIS_PORT).build(); + RedisStoreMapper storeMapper = new WordCountStoreMapper(); + RedisCountStoreBolt countStoreBolt = new RedisCountStoreBolt(poolConfig, storeMapper); + builder.setBolt(STORE_BOLT, countStoreBolt).shuffleGrouping(SPLIT_BOLT); + + // 如果外部传参cluster则代表线上环境启动,否则代表本地启动 + if (args.length > 0 && args[0].equals("cluster")) { + try { + StormSubmitter.submitTopology("ClusterCustomRedisCountApp", new Config(), builder.createTopology()); + } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { + e.printStackTrace(); + } + } else { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("LocalCustomRedisCountApp", + new Config(), builder.createTopology()); + } + } +} +``` \ No newline at end of file diff --git a/notes/installation/基于Zookeeper搭建Kafka高可用集群.md b/notes/installation/基于Zookeeper搭建Kafka高可用集群.md index e69de29..9fdcad1 100644 --- a/notes/installation/基于Zookeeper搭建Kafka高可用集群.md +++ b/notes/installation/基于Zookeeper搭建Kafka高可用集群.md @@ -0,0 +1,118 @@ +# 基于Zookeeper搭建Kafka高可用集群 + +## 一、Zookeeper集群搭建 + +## 二、Kafka集群搭建 + +### 2.1 下载解压 + +Kafka安装包官方下载地址:http://kafka.apache.org/downloads ,本用例下载的版本为`2.2.0`,下载命令: + +```shell +# 下载 +wget https://www-eu.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz +``` + +这里说明一下kafka安装包的命名规则:以`kafka_2.12-2.2.0.tgz`为例,前面的2.12代表Scala的版本号(Kafka是采用Scala语言开发的),后面的2.2.0代表Kafka的版本号。 + +```shell +# 解压 +tar -xzf kafka_2.12-2.2.0.tgz +``` + +### 2.2 拷贝配置文件 + +进入解压目录的` config`目录下 ,拷贝三份配置文件 + +```shell +# cp server.properties server-1.properties +# cp server.properties server-2.properties +# cp server.properties server-3.properties +``` + +### 2.3 修改配置 + +分别修改三份配置文件中的部分配置,如下: + +server-1.properties: + +```properties +# The id of the broker. 集群中每个节点的唯一标识 +broker.id=0 +# 监听地址 +listeners=PLAINTEXT://hadoop001:9092 +# 日志文件存放位置 +log.dirs=/usr/local/kafka-logs/00 +# Zookeeper连接地址 +zookeeper.connect=hadoop001:2181,hadoop001:2182,hadoop001:2183 +``` + +server-2.properties: + +```properties +broker.id=1 +listeners=PLAINTEXT://hadoop001:9093 +log.dirs=/usr/local/kafka-logs/01 +zookeeper.connect=hadoop001:2181,hadoop001:2182,hadoop001:2183 +``` + +server-3.properties: + +```properties +broker.id=2 +listeners=PLAINTEXT://hadoop001:9094 +log.dirs=/usr/local/kafka-logs/02 +zookeeper.connect=hadoop001:2181,hadoop001:2182,hadoop001:2183 +``` + +### 2.4 启动集群 + +分别指定不同配置文件,启动三个Kafka节点: + +```shell +bin/kafka-server-start.sh config/server-1.properties +bin/kafka-server-start.sh config/server-2.properties +bin/kafka-server-start.sh config/server-3.properties +``` + +启动后使用jps查看进程,此时应该有三个zookeeper进程和三个kafka进程: + +```shell +[root@hadoop001 kafka_2.12-2.2.0]# jps +14288 QuorumPeerMain +18385 Jps +16228 Kafka +17653 Kafka +14374 QuorumPeerMain +16826 Kafka +14446 QuorumPeerMain +``` + +### 2.5 创建测试主题 + +创建测试主题: + +```shell +bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic +``` + +查看主题信息: + +```shell +bin/kafka-topics.sh --describe --bootstrap-server hadoop001:9092 --topic my-replicated-topic +``` + +![kafka-cluster-shell](D:\BigData-Notes\pictures\kafka-cluster-shell.png) + +还可以进一步创建一个消费者和一个生产者进行测试: + +```shell +# 创建生产者 +bin/kafka-console-producer.sh --broker-list hadoop001:9093 --topic my-replicated-topic +``` + +```shell +# 创建消费者 +bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9094 --from-beginning --topic my-replicated-topic +``` + diff --git a/pictures/jar-with-dependencies.png b/pictures/jar-with-dependencies.png new file mode 100644 index 0000000..c1254bf Binary files /dev/null and b/pictures/jar-with-dependencies.png differ diff --git a/pictures/kafka-cluster-shell.png b/pictures/kafka-cluster-shell.png new file mode 100644 index 0000000..3826dba Binary files /dev/null and b/pictures/kafka-cluster-shell.png differ diff --git a/pictures/storm-Redis-Mapper.png b/pictures/storm-Redis-Mapper.png new file mode 100644 index 0000000..49217d9 Binary files /dev/null and b/pictures/storm-Redis-Mapper.png differ diff --git a/pictures/storm-abstractRedisBolt.png b/pictures/storm-abstractRedisBolt.png new file mode 100644 index 0000000..62c15df Binary files /dev/null and b/pictures/storm-abstractRedisBolt.png differ diff --git a/pictures/storm-jar2.png b/pictures/storm-jar2.png new file mode 100644 index 0000000..2771741 Binary files /dev/null and b/pictures/storm-jar2.png differ diff --git a/pictures/storm-jedicCommands.png b/pictures/storm-jedicCommands.png new file mode 100644 index 0000000..5c60e05 Binary files /dev/null and b/pictures/storm-jedicCommands.png differ