storm整合用例

This commit is contained in:
罗祥 2019-04-19 17:58:15 +08:00
parent 756d0eb315
commit b5f452fa0a
17 changed files with 1955 additions and 92 deletions

View File

@ -9,7 +9,6 @@
<version>1.0</version> <version>1.0</version>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<storm.version>1.2.2</storm.version> <storm.version>1.2.2</storm.version>
</properties> </properties>
@ -20,6 +19,53 @@
</repository> </repository>
</repositories> </repositories>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<!--Storm整合HDFS依赖-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build> <build>
<plugins> <plugins>
@ -83,53 +129,4 @@
</plugins> </plugins>
</build> </build>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<!--Storm整合HDFS依赖-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project> </project>

View File

@ -8,6 +8,29 @@
<artifactId>storm-kafka-integration</artifactId> <artifactId>storm-kafka-integration</artifactId>
<version>1.0</version> <version>1.0</version>
<properties>
<storm.version>1.2.2</storm.version>
<kafka.version>2.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
@ -68,27 +91,4 @@
</plugins> </plugins>
</build> </build>
<properties>
<storm.version>1.2.2</storm.version>
<kafka.version>2.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
</project> </project>

View File

@ -14,7 +14,7 @@ import org.apache.storm.topology.TopologyBuilder;
import java.util.Properties; import java.util.Properties;
/** /**
* 写入数据到Kafka的特定主题 * 写入数据到Kafka
*/ */
public class WritingToKafkaApp { public class WritingToKafkaApp {

View File

@ -9,10 +9,23 @@
<version>1.0</version> <version>1.0</version>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<storm.version>1.2.2</storm.version> <storm.version>1.2.2</storm.version>
</properties> </properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>${storm.version}</version>
</dependency>
</dependencies>
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
@ -73,19 +86,4 @@
</plugins> </plugins>
</build> </build>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>${storm.version}</version>
</dependency>
</dependencies>
</project> </project>

View File

@ -5,7 +5,7 @@ import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.tuple.ITuple; import org.apache.storm.tuple.ITuple;
/** /**
* 定义流数据与Redis中数据的映射关系 * 定义tuple与Redis中数据的映射关系
*/ */
public class WordCountStoreMapper implements RedisStoreMapper { public class WordCountStoreMapper implements RedisStoreMapper {
private RedisDataTypeDescription description; private RedisDataTypeDescription description;

View File

@ -0,0 +1,309 @@
# Storm多种打包方式对比分析
## 一、简介
在将Storm Topology提交到服务器集群进行运行时需要先将项目进行打包本文主要对比分析各种打包方式并将打包过程中需要注意的事项进行说明。主要打包方式有以下三种
+ 第一种不加任何插件直接使用mvn package打包
+ 第二种使用maven-assembly-plugin插件进行打包
+ 第三种使用maven-shade-plugin进行打包。
以下分别进行详细的说明。
## 二、mvn package
### 2.1 mvn package的局限
不在POM中配置任何插件直接使用`mvn package`进行项目打包,这对于没有使用外部依赖包的项目是可行的。
但如果项目中使用了第三方JAR包就会出现问题因为`mvn package`打包后的JAR中是不含有依赖包的如果此时你提交到服务器上运行就会出现找不到第三方依赖的异常。
如果你想采用这种方式进行打包但是又使用了第三方JAR包有没有解决办法答案是有的这一点在官方文档中[Command Line Client](http://storm.apache.org/releases/2.0.0-SNAPSHOT/Command-line-client.html)这部分有所讲解,主要解决办法如下。
### 2.2 解决办法
在使用`storm jar topology-jar-path class ...`提交Topology的时候需要指定第三方依赖时候可以进行如下操作
+ 如果第三方JAR包在本地可以使用`--jars`指定;
+ 如果第三方JAR包在远程中央仓库可以使用`--artifacts` 指定,此时如果想要排除某些依赖,可以使用 `^` 符号;
+ 如果第三方JAR包在其他仓库还需要使用 `--artifactRepositories`指明仓库地址,库名和地址使用 `^` 符号分隔。
以下是包含上面三种情况的一个例子:
```shell
./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka/storm-kafka-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"
```
## 三、maven-assembly-plugin插件
### 3.1 官方文档说明
maven-assembly-plugin是官方文档中介绍的打包方法以下表述来源于官方文档[Running Topologies on a Production Cluster](http://storm.apache.org/releases/2.0.0-SNAPSHOT/Running-topologies-on-a-production-cluster.html)
> If you're using Maven, the [Maven Assembly Plugin](http://maven.apache.org/plugins/maven-assembly-plugin/) can do the packaging for you. Just add this to your pom.xml:
>
> ```xml
> <plugin>
> <artifactId>maven-assembly-plugin</artifactId>
> <configuration>
> <descriptorRefs>
> <descriptorRef>jar-with-dependencies</descriptorRef>
> </descriptorRefs>
> <archive>
> <manifest>
> <mainClass>com.path.to.main.Class</mainClass>
> </manifest>
> </archive>
> </configuration>
> </plugin>
> ```
>
> Then run mvn assembly:assembly to get an appropriately packaged jar. Make sure you [exclude](http://maven.apache.org/plugins/maven-assembly-plugin/examples/single/including-and-excluding-artifacts.html) the Storm jars since the cluster already has Storm on the classpath.
主要是两点:
- 使用maven-assembly-plugin进行打包因为maven-assembly-plugin会把所有的依赖一并打包到最后的JAR中
- 排除掉Storm集群环境中已经提供的Storm jars。
maven-assembly-plugin的使用非常简单只需要在POM.xml中引入即可并且在\<descriptorRef>标签指定打包格式为`jar-with-dependencies`。那么就剩下一个问题如何排除Storm jars
### 3.2 排除Storm jars
这里说明一下,`jar-with-dependencies`是Maven官方内置的一种打包格式Maven官方文档[Pre-defined Descriptor Files](http://maven.apache.org/plugins/maven-assembly-plugin/descriptor-refs.html)中有所说明:
![jar-with-dependencies](D:\BigData-Notes\pictures\jar-with-dependencies.png)
如果你想排除某个依赖,这里以排除`storm-core`为例,你可以在`jar-with-dependencies`的XML上进行修改。
```xml
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
<id>jar-with-dependencies</id>
<!--指明打包方式-->
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>
<scope>runtime</scope>
<!--排除storm环境中已经提供的storm-core-->
<excludes>
<exclude>org.apache.storm:storm-core</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>
```
### 3.3 最终配置
采用maven-assembly-plugin进行打包时候最终的配置应该如下
#### 1.引入插件
在POM.xml中引入插件并指定打包格式的配置文件`assembly.xml`(名称可自定义)
```xml
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/resources/assembly.xml</descriptor>
</descriptors>
<archive>
<manifest>
<mainClass>com.heibaiying.wordcount.ClusterWordCountApp</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
```
assembly.xml文件内容如下
```xml
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
<id>jar-with-dependencies</id>
<!--指明打包方式-->
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>
<scope>runtime</scope>
<!--排除storm环境中已经提供的storm-core-->
<excludes>
<exclude>org.apache.storm:storm-core</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>
```
>在配置文件中不仅可以排除依赖,还可以排除指定的文件,更多的配置规则可以参考官方文档:[Descriptor Format](http://maven.apache.org/plugins/maven-assembly-plugin/assembly.html#)
#### 2. 打包命令
采用maven-assembly-plugin进行打包时命令如下
```shell
# mvn assembly:assembly
```
打包后会同时生成两个JAR包其中后缀为`jar-with-dependencies`是含有第三方依赖的JAR包后缀是由`assembly.xml``<id>`标签指定的可以自定义修改。提交该JAR到集群环境即可直接使用。
![storm-jar](D:\BigData-Notes\pictures\storm-jar.png)
## 四、maven-shade-plugin插件
### 4.1 官方文档说明
第三种方式是使用maven-shade-plugin既然已经有了maven-assembly-plugin为什么还需要maven-shade-plugin这一点在官方文档中也是有所说明的来自于官方对HDFS整合讲解的章节[Storm HDFS Integration](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-hdfs.html),原文如下:
>When packaging your topology, it's important that you use the [maven-shade-plugin](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-hdfs.html) as opposed to the [maven-assembly-plugin](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-hdfs.html).
>
>The shade plugin provides facilities for merging JAR manifest entries, which the hadoop client leverages for URL scheme resolution.
>
>If you experience errors such as the following:
>
>```
>java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs
>```
>
>it's an indication that your topology jar file isn't packaged properly.
>
>If you are using maven to create your topology jar, you should use the following `maven-shade-plugin` configuration to create your topology jar。
这里第一句就说的比较清晰在集成HDFS时候你必须使用maven-shade-plugin来代替maven-assembly-plugin否则会抛出
RuntimeException异常。
采用maven-shade-plugin有很多好处比如你的工程依赖很多的JAR包而被依赖的JAR又会依赖其他的JAR包这样,当工程中依赖到不同的版本的 JAR时并且JAR中具有相同名称的资源文件时shade插件会尝试将所有资源文件打包在一起时而不是和assembly一样执行覆盖操作。
### 4.2 配置
配置示例如下:
```xml
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.sf</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.dsa</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.rsa</exclude>
<exclude>META-INF/*.EC</exclude>
<exclude>META-INF/*.ec</exclude>
<exclude>META-INF/MSFTSIG.SF</exclude>
<exclude>META-INF/MSFTSIG.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>org.apache.storm:storm-core</exclude>
</excludes>
</artifactSet>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
```
配置说明:
有些jar包生成时会使用jarsigner生成文件签名完成性校验分为两个文件存放在META-INF目录下。
+ a signature file, with a .SF extension
+ a signature block file, with a .DSA, .RSA, or .EC extension
如果某些包的存在重复引用,这可能会导致在打包时候出现`Invalid signature file digest for Manifest main attributes`异常,所以在配置中排除这些文件。
### 4.3 打包命令
使用maven-shade-plugin进行打包的时候打包命令和普通的一样
```shell
# mvn package
```
打包后会生成两个JAR包提交到服务器集群时使用非original开头的JAR.
![storm-jar2](D:\BigData-Notes\pictures\storm-jar2.png)
## 五、结论
通过以上三种打包方式的详细介绍,这里给出最后的结论:**建议使用maven-shade-plugin插件进行打包**因为其通用性最强操作最简单并且storm官方Github中给的所有[examples](https://github.com/apache/storm/tree/master/examples)都是采用的第三种打包方式。
## 六、打包注意事项
无论采用任何打包方式都必须排除集群环境中已经提供的storm jars。这里比较典型的是storm-core其在安装目录的lib目录下已经存在。
![storm-lib](D:\BigData-Notes\pictures\storm-lib.png)
如果你不排除storm-core通常会抛出下面的异常
```properties
Caused by: java.lang.RuntimeException: java.io.IOException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/usr/app/apache-storm-1.2.2/lib/storm-core-1.2.2.jar!/defaults.yaml, jar:file:/usr/appjar/storm-hdfs-integration-1.0.jar!/defaults.yaml]
at org.apache.storm.utils.Utils.findAndReadConfigFile(Utils.java:384)
at org.apache.storm.utils.Utils.readDefaultConfig(Utils.java:428)
at org.apache.storm.utils.Utils.readStormConfig(Utils.java:464)
at org.apache.storm.utils.Utils.<clinit>(Utils.java:178)
... 39 more
```
![storm-jar-complie-error](D:\BigData-Notes\pictures\storm-jar-complie-error.png)

View File

@ -471,7 +471,7 @@ storm kill ClusterWordCountApp -w 3
### 1. mvn package的局限性 ### 1. mvn package的局限性
上面我们直接使用`mvn package`进行项目打包这对于没有使用外部依赖包的项目是可行的。但如果项目中使用了第三方JAR包就会出现问题因为`package`打包后的JAR中是不含有依赖包的如果此时你提交到服务器上运行就会出现找不到第三方依赖的异常。 上面我们没有在POM中配置任何插件直接使用`mvn package`进行项目打包这对于没有使用外部依赖包的项目是可行的。但如果项目中使用了第三方JAR包就会出现问题因为`package`打包后的JAR中是不含有依赖包的如果此时你提交到服务器上运行就会出现找不到第三方依赖的异常。
这时候可能大家会有疑惑,在我们的项目中不是使用了`storm-core`这个依赖吗其实上面之所以我们能运行成功是因为在Storm的集群环境中提供了这个JAR包在安装目录的lib目录下 这时候可能大家会有疑惑,在我们的项目中不是使用了`storm-core`这个依赖吗其实上面之所以我们能运行成功是因为在Storm的集群环境中提供了这个JAR包在安装目录的lib目录下

View File

@ -0,0 +1,472 @@
# Storm集成HDFS和HBase
## 一、Storm集成HDFS
### 1.1 项目结构
> 本用例源码下载地址:[storm-hdfs-integration](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-hdfs-integration)
### 1.2 项目主要依赖
项目主要依赖如下,有两个需要注意:
+ 这里由于我服务器上安装的是CDH版本的Hadoop在导入依赖时引入的也是CDH版本的依赖需要使用`<repository>`标签指定CDH的仓库地址
+ `hadoop-common``hadoop-client``hadoop-hdfs`均需要排除`slf4j-log4j12`依赖,原因是`storm-core`中已经有该依赖不排除的话有JAR包冲突的风险
```xml
<properties>
<storm.version>1.2.2</storm.version>
</properties>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<!--Storm整合HDFS依赖-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
```
### 1.3 DataSourceSpout
```java
/**
* 产生词频样本的数据源
*/
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
// 模拟产生数据
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
/**
* 模拟数据
*/
private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}
}
```
产生的模拟数据格式如下:
```properties
Spark HBase
Hive Flink Storm Hadoop HBase Spark
Flink
HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
Hadoop Spark HBase Storm
```
### 1.4 将数据存储到HDFS
这里HDFS的地址和数据存储路径均使用了硬编码在实际开发中可以通过外部传参指定这样程序更为灵活。
```java
public class DataToHdfsApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String HDFS_BOLT = "hdfsBolt";
public static void main(String[] args) {
// 指定Hadoop的用户名 如果不指定,则在HDFS创建目录时候有可能抛出无权限的异常(RemoteException: Permission denied)
System.setProperty("HADOOP_USER_NAME", "root");
// 定义输出字段(Field)之间的分隔符
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// 同步策略: 每100个tuples之后就会把数据从缓存刷新到HDFS中
SyncPolicy syncPolicy = new CountSyncPolicy(100);
// 文件策略: 每个文件大小上限1M,超过限定时,创建新文件并继续写入
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB);
// 定义存储路径
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm-hdfs/");
// 定义HdfsBolt
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://hadoop001:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
// 构建Topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
// save to HDFS
builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT);
// 如果外部传参cluster则代表线上环境启动,否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterDataToHdfsApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalDataToHdfsApp",
new Config(), builder.createTopology());
}
}
}
```
### 1.5 启动测试
可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用`maven-shade-plugin`进行打包,打包命令如下:
```shell
# mvn clean package -D maven.test.skip=true
```
运行后数据会存储到HDFS的`/storm-hdfs`目录下。使用以下命令可以查看目录内容:
```shell
# 查看目录内容
hadoop fs -ls /storm-hdfs
# 监听文内容变化
hadoop fs -tail -f /strom-hdfs/文件名
```
![storm-hdfs-result](D:\BigData-Notes\pictures\storm-hdfs-result.png)
## 二、Storm集成HBase
### 2.1 项目结构
本用例源码下载地址:[storm-hbase-integration](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-hbase-integration)
### 2.2 项目主要依赖
```xml
<properties>
<storm.version>1.2.2</storm.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<!--Storm整合HBase依赖-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>${storm.version}</version>
</dependency>
</dependencies>
```
### 2.3 DataSourceSpout
```java
/**
* 产生词频样本的数据源
*/
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
// 模拟产生数据
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
/**
* 模拟数据
*/
private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}
}
```
产生的模拟数据格式如下:
```properties
Spark HBase
Hive Flink Storm Hadoop HBase Spark
Flink
HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
Hadoop Spark HBase Storm
```
### 2.4 SplitBolt
```java
/**
* 将每行数据按照指定分隔符进行拆分
*/
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split("\t");
for (String word : words) {
collector.emit(tuple(word, 1));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
```
### 2.5 CountBolt
```java
/**
* 进行词频统计
*/
public class CountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
// 输出
collector.emit(new Values(word, String.valueOf(count)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
```
### 2.6 WordCountToHBaseApp
```java
/**
* 进行词频统计 并将统计结果存储到HBase中
*/
public class WordCountToHBaseApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String SPLIT_BOLT = "splitBolt";
private static final String COUNT_BOLT = "countBolt";
private static final String HBASE_BOLT = "hbaseBolt";
public static void main(String[] args) {
// storm的配置
Config config = new Config();
// HBase的配置
Map<String, Object> hbConf = new HashMap<>();
hbConf.put("hbase.rootdir", "hdfs://hadoop001:8020/hbase");
hbConf.put("hbase.zookeeper.quorum", "hadoop001:2181");
// 将HBase的配置传入Storm的配置中
config.put("hbase.conf", hbConf);
// 定义流数据与HBase中数据的映射
SimpleHBaseMapper mapper = new SimpleHBaseMapper()
.withRowKeyField("word")
.withColumnFields(new Fields("word","count"))
.withColumnFamily("info");
/*
* 给HBaseBolt传入表名、数据映射关系、和HBase的配置信息
* 表需要预先创建: create 'WordCount','info'
*/
HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
.withConfigKey("hbase.conf");
// 构建Topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1);
// split
builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT);
// count
builder.setBolt(COUNT_BOLT, new CountBolt(),1).shuffleGrouping(SPLIT_BOLT);
// save to HBase
builder.setBolt(HBASE_BOLT, hbase, 1).shuffleGrouping(COUNT_BOLT);
// 如果外部传参cluster则代表线上环境启动,否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountToRedisApp",
config, builder.createTopology());
}
}
}
```
### 2.7 启动测试
可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用`maven-shade-plugin`进行打包,打包命令如下:
```shell
# mvn clean package -D maven.test.skip=true
```
运行后数据会存储到HBase的`WordCount`表中。使用以下命令查看表的内容:
```shell
hbase > scan 'WordCount'
```
![storm-hbase-result](D:\BigData-Notes\pictures\storm-hbase-result.png)
### 2.8 withCounterFields
在上面的用例中我们是手动编码来实现词频统计并将最后的结果存储到HBase中。其实也可以在构建`SimpleHBaseMapper`的时候通过`withCounterFields`指定count字段被指定的字段会自动进行累加操作这样也可以实现词频统计。需要注意的是withCounterFields指定的字段必须是Long类型不能是String类型。
```java
SimpleHBaseMapper mapper = new SimpleHBaseMapper()
.withRowKeyField("word")
.withColumnFields(new Fields("word"))
.withCounterFields(new Fields("count"))
.withColumnFamily("cf");
```

344
notes/Storm集成Kakfa.md Normal file
View File

@ -0,0 +1,344 @@
# Storm集成Kafka
## 一、整合说明
Storm官方对Kafka的整合分为两个版本官方说明文档分别如下
+ [Storm Kafka Integration](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka.html) : 主要是针对0.8.x版本的Kafka提供整合支持
+ [Storm Kafka Integration (0.10.x+)]() : 包含Kafka 新版本的 consumer API主要对Kafka 0.10.x +提供整合支持。
这里我服务端安装的Kafka版本为2.2.0(Released Mar 22, 2019) 按照官方0.10.x+的整合文档进行整合不适用于0.8.x版本的Kafka。
## 二、写入数据到Kafka
### 2.1 项目结构
### 2.2 项目主要依赖
```xml
<properties>
<storm.version>1.2.2</storm.version>
<kafka.version>2.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
```
### 2.3 DataSourceSpout
```java
/**
* 产生词频样本的数据源
*/
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
// 模拟产生数据
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
/**
* 模拟数据
*/
private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}
}
```
产生的模拟数据格式如下:
```properties
Spark HBase
Hive Flink Storm Hadoop HBase Spark
Flink
HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
Hadoop Spark HBase Storm
```
### 2.4 WritingToKafkaApp
```java
/**
* 写入数据到Kafka中
*/
public class WritingToKafkaApp {
private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";
private static final String TOPIC_NAME = "storm-topic";
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
// 定义Kafka生产者属性
Properties props = new Properties();
/*
* 指定broker的地址清单清单里不需要包含所有的broker地址生产者会从给定的broker里查找其他broker的信息。
* 不过建议至少要提供两个broker的信息作为容错。
*/
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
/*
* acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
* acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
* acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
* acks=all : 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
*/
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaBolt bolt = new KafkaBolt<String, String>()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector(TOPIC_NAME))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>());
builder.setSpout("sourceSpout", new DataSourceSpout(), 1);
builder.setBolt("kafkaBolt", bolt, 1).shuffleGrouping("sourceSpout");
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterWritingToKafkaApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWritingToKafkaApp",
new Config(), builder.createTopology());
}
}
}
```
### 2.5 测试准备工作
进行测试前需要启动Kakfa。
#### 1. 启动Kakfa
Kafka的运行依赖于zookeeper需要预先启动可以启动Kafka内置的zookeeper,也可以启动自己安装的。
```shell
# zookeeper启动命令
bin/zkServer.sh start
# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties
```
启动单节点kafka
```shell
# bin/kafka-server-start.sh config/server.properties
```
#### 2. 创建topic
```shell
# 创建用于测试主题
bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 1 --partitions 1 --topic storm-topic
# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
```
#### 3. 启动一个消费者用于观察写入情况
```shell
# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic storm-topic --from-beginning
```
### 2.6 测试
可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用`maven-shade-plugin`进行打包,打包命令如下:
```shell
# mvn clean package -D maven.test.skip=true
```
启动后,消费者监听情况如下:
![strom-kafka-consumer](D:\BigData-Notes\pictures\strom-kafka-consumer.png)
## 三、从Kafka中读取数据
### 3.1 项目结构
### 3.2 ReadingFromKafkaApp
```java
/**
* 从Kafka中读取数据
*/
public class ReadingFromKafkaApp {
private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";
private static final String TOPIC_NAME = "storm-topic";
public static void main(String[] args) {
final TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(BOOTSTRAP_SERVERS, TOPIC_NAME)), 1);
builder.setBolt("bolt", new LogConsoleBolt()).shuffleGrouping("kafka_spout");
// 如果外部传参cluster则代表线上环境启动,否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterReadingFromKafkaApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalReadingFromKafkaApp",
new Config(), builder.createTopology());
}
}
private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) {
return KafkaSpoutConfig.builder(bootstrapServers, topic)
// 除了分组ID,以下配置都是可选的。分组ID必须指定,否则会抛出InvalidGroupIdException异常
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
// 定义重试策略
.setRetry(getRetryService())
// 定时提交偏移量的时间间隔,默认是15s
.setOffsetCommitPeriodMs(10_000)
.build();
}
// 定义重试策略
private static KafkaSpoutRetryService getRetryService() {
return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
}
}
```
### 3.3 LogConsoleBolt
```java
/**
* 打印从Kafka中获取的数据
*/
public class LogConsoleBolt extends BaseRichBolt {
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
public void execute(Tuple input) {
try {
String value = input.getStringByField("value");
System.out.println("received from kafka : "+ value);
// 必须ack,否则会重复消费kafka中的消息
collector.ack(input);
}catch (Exception e){
e.printStackTrace();
collector.fail(input);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
```
这里从`value`字段中获取kafka输出的值数据。
默认情况下,我们可以通过继承`RecordTranslator`接口定义了Kafka中Record与输出流之间的转换关系可以在构建`KafkaSpoutConfig`的时候通过构造器或者`setRecordTranslator()`传入,并最后传递给具体的`KafkaSpout`。如果不指定的情况下,则默认使用内置的`DefaultRecordTranslator`,其源码如下,`FIELDS`中 定义了tuple中所有可用的字段
```java
public class DefaultRecordTranslator<K, V> implements RecordTranslator<K, V> {
private static final long serialVersionUID = -5782462870112305750L;
public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
@Override
public List<Object> apply(ConsumerRecord<K, V> record) {
return new Values(record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
}
@Override
public Fields getFieldsFor(String stream) {
return FIELDS;
}
@Override
public List<String> streams() {
return DEFAULT_STREAM;
}
}
```
### 3.4 启动测试
这里启动一个生产者用于发送测试数据,启动命令如下:
```shell
# bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic storm-topic
```
![storm-kafka-producer](D:\BigData-Notes\pictures\storm-kafka-producer.png)
本地运行的项目接收到从Kafka发送过来的数据
![storm-kafka-receiver](D:\BigData-Notes\pictures\storm-kafka-receiver.png)

View File

@ -0,0 +1,625 @@
# Storm 集成 Redis 详解
## 一、简介
storm-redis提供了Storm与Redis的集成支持你只需要引入对应的依赖即可使用。Storm-redis使用Jedis为Redis客户端提供了基本的Bolt实现 `RedisLookupBolt` and `RedisStoreBolt`
+ RedisLookupBolt从Redis中查询数据
+ RedisStoreBolt存储数据到Redis
+ RedisFilterBolt : 查询符合条件的数据;
`RedisLookupBolt``RedisStoreBolt`都继承自`AbstractRedisBolt`抽象类,我们也可以继承自该抽象类,然后按照我们自己的的业务逻辑进行功能的拓展。
```xml
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>${storm.version}</version>
<type>jar</type>
</dependency>
```
## 二、集成案例
### 2.1 项目结构
### 2.2 项目依赖
项目主要依赖如下:
```xml
<properties>
<storm.version>1.2.2</storm.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>${storm.version}</version>
</dependency>
</dependencies>
```
### 2.3 DataSourceSpout
```java
/**
* 产生词频样本的数据源
*/
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
// 模拟产生数据
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
/**
* 模拟数据
*/
private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}
}
```
产生的模拟数据格式如下:
```properties
Spark HBase
Hive Flink Storm Hadoop HBase Spark
Flink
HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
Hadoop Spark HBase Storm
```
### 2.4 SplitBolt
```java
/**
* 将每行数据按照指定分隔符进行拆分
*/
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split("\t");
for (String word : words) {
collector.emit(new Values(word, String.valueOf(1)));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
```
### 2.5 CountBolt
```java
/**
* 进行词频统计
*/
public class CountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
// 输出
collector.emit(new Values(word, String.valueOf(count)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
```
### 2.6 WordCountStoreMapper
实现RedisStoreMapper并定义定义tuple与Redis中数据的映射关系Redis存储的是Key/Value键值对并且支持多种数据结构你需要指定tuple中的那个字段为key那个字段为value并且存储为什么数据结构。
```java
/**
* 定义tuple与Redis中数据的映射关系
*/
public class WordCountStoreMapper implements RedisStoreMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";
public WordCountStoreMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return tuple.getStringByField("count");
}
}
```
### 2.7 WordCountToRedisApp
```java
/**
* 进行词频统计 并将统计结果存储到Redis中
*/
public class WordCountToRedisApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String SPLIT_BOLT = "splitBolt";
private static final String COUNT_BOLT = "countBolt";
private static final String STORE_BOLT = "storeBolt";
//在实际开发中这些参数可以将通过外部传入 使得程序更加灵活
private static final String REDIS_HOST = "192.168.200.226";
private static final int REDIS_PORT = 6379;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
// split
builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
// count
builder.setBolt(COUNT_BOLT, new CountBolt()).shuffleGrouping(SPLIT_BOLT);
// save to redis
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
builder.setBolt(STORE_BOLT, storeBolt).shuffleGrouping(COUNT_BOLT);
// 如果外部传参cluster则代表线上环境启动否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterWordCountToRedisApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountToRedisApp",
new Config(), builder.createTopology());
}
}
}
```
### 2.8 启动测试
可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用`maven-shade-plugin`进行打包,打包命令如下:
```shell
# mvn clean package -D maven.test.skip=true
```
启动后查看Redis中的数据
![store-redis-manager](D:\BigData-Notes\pictures\store-redis-manager.png)
## 三、storm-redis 实现原理
### 3.1 AbstractRedisBolt
`RedisLookupBolt``RedisStoreBolt`都继承自`AbstractRedisBolt`抽象类和我们自定义实现Bolt一样`AbstractRedisBolt`间接继承自`BaseRichBolt`
![storm-abstractRedisBolt](D:\BigData-Notes\pictures\storm-abstractRedisBolt.png)
`AbstractRedisBolt`中比较重要的是prepare方法在该方法中通过外部传入的jedis连接池配置( jedisPoolConfig/jedisClusterConfig) 创建用于管理Jedis实例的容器`JedisCommandsInstanceContainer`
```java
public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt {
protected OutputCollector collector;
private transient JedisCommandsInstanceContainer container;
private JedisPoolConfig jedisPoolConfig;
private JedisClusterConfig jedisClusterConfig;
......
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
// FIXME: stores map (stormConf), topologyContext and expose these to derived classes
this.collector = collector;
if (jedisPoolConfig != null) {
this.container = JedisCommandsContainerBuilder.build(jedisPoolConfig);
} else if (jedisClusterConfig != null) {
this.container = JedisCommandsContainerBuilder.build(jedisClusterConfig);
} else {
throw new IllegalArgumentException("Jedis configuration not found");
}
}
.......
}
```
`JedisCommandsInstanceContainer``build()`方法如下实际上就是创建JedisPool或JedisCluster并传入容器中。
```java
public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
return new JedisContainer(jedisPool);
}
public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getTimeout(), config.getMaxRedirections(), config.getPassword(), DEFAULT_POOL_CONFIG);
return new JedisClusterContainer(jedisCluster);
}
```
### 3.2 RedisStoreBolt和RedisLookupBolt
`RedisStoreBolt`中比较重要的是process方法该方法主要从storeMapper中获取传入key/value的值并按照其存储类型`dataType`调用jedisCommand的对应方法进行存储。
RedisLookupBolt 的实现基本类似从lookupMapper中获取传入的key值并进行查询操作。
```java
public class RedisStoreBolt extends AbstractRedisBolt {
private final RedisStoreMapper storeMapper;
private final RedisDataTypeDescription.RedisDataType dataType;
private final String additionalKey;
public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
super(config);
this.storeMapper = storeMapper;
RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) {
super(config);
this.storeMapper = storeMapper;
RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
@Override
public void process(Tuple input) {
String key = storeMapper.getKeyFromTuple(input);
String value = storeMapper.getValueFromTuple(input);
JedisCommands jedisCommand = null;
try {
jedisCommand = getInstance();
switch (dataType) {
case STRING:
jedisCommand.set(key, value);
break;
case LIST:
jedisCommand.rpush(key, value);
break;
case HASH:
jedisCommand.hset(additionalKey, key, value);
break;
case SET:
jedisCommand.sadd(key, value);
break;
case SORTED_SET:
jedisCommand.zadd(additionalKey, Double.valueOf(value), key);
break;
case HYPER_LOG_LOG:
jedisCommand.pfadd(key, value);
break;
case GEO:
String[] array = value.split(":");
if (array.length != 2) {
throw new IllegalArgumentException("value structure should be longitude:latitude");
}
double longitude = Double.valueOf(array[0]);
double latitude = Double.valueOf(array[1]);
jedisCommand.geoadd(additionalKey, longitude, latitude, key);
break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + dataType);
}
collector.ack(input);
} catch (Exception e) {
this.collector.reportError(e);
this.collector.fail(input);
} finally {
returnInstance(jedisCommand);
}
}
.........
}
```
### 3.3 JedisCommands
JedisCommands接口中定义了所有的 Redis 客户端命令它有以下三个实现类分别是Jedis、JedisCluster、ShardedJedis。Strom中主要使用前两种实现类具体调用哪一个实现类来执行命令由传入的是jedisPoolConfig还是jedisClusterConfig来决定。
![storm-jedicCommands](D:\BigData-Notes\pictures\storm-jedicCommands.png)
### 3.4 RedisMapper 和 TupleMapper
RedisMapper 和 TupleMapper 定义了 tuple 和 Redis 中的数据如何进行映射转换。
![storm-Redis-Mapper](D:\BigData-Notes\pictures\storm-Redis-Mapper.png)
#### 1. TupleMapper
TupleMapper 主要定义了两个方法:
+ getKeyFromTuple(ITuple tuple) 从tuple中获取那个字段作为Key
+ getValueFromTuple(ITuple tuple)从tuple中获取那个字段作为Value
#### 2. RedisMapper
定义了获取数据类型的方法`getDataTypeDescription()`,RedisDataTypeDescription中RedisDataType枚举类定义了所有可用的Redis数据类型
```java
public class RedisDataTypeDescription implements Serializable {
public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG, GEO }
......
}
```
#### 3. RedisStoreMapper
RedisStoreMapper继承TupleMapper和RedisMapper接口用于存储数据时没有定义额外方法。
#### 4. RedisLookupMapper
RedisLookupMapper继承TupleMapper和RedisMapper接口
+ 定义了declareOutputFields方法声明输出的字段。
+ 定义了toTuple方法将查询结果组装为Storm的Values的集合并用于发送。
下面的例子表示从输入`Tuple`的获取`word`字段作为key使用`RedisLookupBolt`进行查询后将key和查询结果value组装为values并发送到下一个处理单元。
```java
class WordCountRedisLookupMapper implements RedisLookupMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";
public WordCountRedisLookupMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public List<Values> toTuple(ITuple input, Object value) {
String member = getKeyFromTuple(input);
List<Values> values = Lists.newArrayList();
values.add(new Values(member, value));
return values;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordName", "count"));
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return null;
}
}
```
#### 5. RedisFilterMapper
RedisFilterMapper继承TupleMapper和RedisMapper接口用于查询数据时定义了declareOutputFields方法声明输出的字段。如下面的实现
```java
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordName", "count"));
}
```
## 四、自定义RedisBolt实现词频统计
自定义RedisBolt利用Redis中哈希结构的hincrby key field命令进行词频统计。在Redis中`hincrby`的执行效果如下如果在执行时字段不存在则在执行操作之前将值设置为0。通过这个命令可以非常轻松的实现词频统计功能。
```shell
redis> HSET myhash field 5
(integer) 1
redis> HINCRBY myhash field 1
(integer) 6
redis> HINCRBY myhash field -1
(integer) 5
redis> HINCRBY myhash field -10
(integer) -5
redis>
```
### 4.1 自定义RedisBolt的代码实现
```java
/**
* 自定义RedisBolt 利用Redis的哈希数据结构的hincrby key field命令进行词频统计
*/
public class RedisCountStoreBolt extends AbstractRedisBolt {
private final RedisStoreMapper storeMapper;
private final RedisDataTypeDescription.RedisDataType dataType;
private final String additionalKey;
public RedisCountStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
super(config);
this.storeMapper = storeMapper;
RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
@Override
protected void process(Tuple tuple) {
String key = storeMapper.getKeyFromTuple(tuple);
String value = storeMapper.getValueFromTuple(tuple);
JedisCommands jedisCommand = null;
try {
jedisCommand = getInstance();
if (dataType == RedisDataTypeDescription.RedisDataType.HASH) {
jedisCommand.hincrBy(additionalKey, key, Long.valueOf(value));
} else {
throw new IllegalArgumentException("Cannot process such data type for Count: " + dataType);
}
collector.ack(tuple);
} catch (Exception e) {
this.collector.reportError(e);
this.collector.fail(tuple);
} finally {
returnInstance(jedisCommand);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
```
### 4.2 CustomRedisCountApp
```java
/**
* 利用自定义的RedisBolt实现词频统计
*/
public class CustomRedisCountApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String SPLIT_BOLT = "splitBolt";
private static final String STORE_BOLT = "storeBolt";
private static final String REDIS_HOST = "192.168.200.226";
private static final int REDIS_PORT = 6379;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
// split
builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
// save to redis and count
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisCountStoreBolt countStoreBolt = new RedisCountStoreBolt(poolConfig, storeMapper);
builder.setBolt(STORE_BOLT, countStoreBolt).shuffleGrouping(SPLIT_BOLT);
// 如果外部传参cluster则代表线上环境启动,否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterCustomRedisCountApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalCustomRedisCountApp",
new Config(), builder.createTopology());
}
}
}
```

View File

@ -0,0 +1,118 @@
# 基于Zookeeper搭建Kafka高可用集群
## 一、Zookeeper集群搭建
## 二、Kafka集群搭建
### 2.1 下载解压
Kafka安装包官方下载地址http://kafka.apache.org/downloads ,本用例下载的版本为`2.2.0`,下载命令:
```shell
# 下载
wget https://www-eu.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz
```
这里说明一下kafka安装包的命名规则`kafka_2.12-2.2.0.tgz`为例前面的2.12代表Scala的版本号Kafka是采用Scala语言开发的后面的2.2.0代表Kafka的版本号。
```shell
# 解压
tar -xzf kafka_2.12-2.2.0.tgz
```
### 2.2 拷贝配置文件
进入解压目录的` config`目录下 ,拷贝三份配置文件
```shell
# cp server.properties server-1.properties
# cp server.properties server-2.properties
# cp server.properties server-3.properties
```
### 2.3 修改配置
分别修改三份配置文件中的部分配置,如下:
server-1.properties
```properties
# The id of the broker. 集群中每个节点的唯一标识
broker.id=0
# 监听地址
listeners=PLAINTEXT://hadoop001:9092
# 日志文件存放位置
log.dirs=/usr/local/kafka-logs/00
# Zookeeper连接地址
zookeeper.connect=hadoop001:2181,hadoop001:2182,hadoop001:2183
```
server-2.properties
```properties
broker.id=1
listeners=PLAINTEXT://hadoop001:9093
log.dirs=/usr/local/kafka-logs/01
zookeeper.connect=hadoop001:2181,hadoop001:2182,hadoop001:2183
```
server-3.properties
```properties
broker.id=2
listeners=PLAINTEXT://hadoop001:9094
log.dirs=/usr/local/kafka-logs/02
zookeeper.connect=hadoop001:2181,hadoop001:2182,hadoop001:2183
```
### 2.4 启动集群
分别指定不同配置文件启动三个Kafka节点
```shell
bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties
bin/kafka-server-start.sh config/server-3.properties
```
启动后使用jps查看进程此时应该有三个zookeeper进程和三个kafka进程
```shell
[root@hadoop001 kafka_2.12-2.2.0]# jps
14288 QuorumPeerMain
18385 Jps
16228 Kafka
17653 Kafka
14374 QuorumPeerMain
16826 Kafka
14446 QuorumPeerMain
```
### 2.5 创建测试主题
创建测试主题:
```shell
bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
```
查看主题信息:
```shell
bin/kafka-topics.sh --describe --bootstrap-server hadoop001:9092 --topic my-replicated-topic
```
![kafka-cluster-shell](D:\BigData-Notes\pictures\kafka-cluster-shell.png)
还可以进一步创建一个消费者和一个生产者进行测试:
```shell
# 创建生产者
bin/kafka-console-producer.sh --broker-list hadoop001:9093 --topic my-replicated-topic
```
```shell
# 创建消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9094 --from-beginning --topic my-replicated-topic
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 67 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

BIN
pictures/storm-jar2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.8 KiB