strom集成其他框架

This commit is contained in:
罗祥 2019-04-18 16:51:38 +08:00
parent 85f2539edb
commit 756d0eb315
22 changed files with 516 additions and 110 deletions

View File

@ -77,6 +77,10 @@ TODO
2. [Storm核心概念详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm核心概念详解.md) 2. [Storm核心概念详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm核心概念详解.md)
3. [Storm单机版本环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Storm%E5%8D%95%E6%9C%BA%E7%89%88%E6%9C%AC%E7%8E%AF%E5%A2%83%E6%90%AD%E5%BB%BA.md) 3. [Storm单机版本环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Storm%E5%8D%95%E6%9C%BA%E7%89%88%E6%9C%AC%E7%8E%AF%E5%A2%83%E6%90%AD%E5%BB%BA.md)
4. [Storm编程模型详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm编程模型详解.md) 4. [Storm编程模型详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm编程模型详解.md)
5. Storm整合Redis
6. Storm整合HDFS/HBase
7. Storm整合Kafka
8. Storm Topology的两种打包方式
## 六、Flume ## 六、Flume

View File

@ -9,36 +9,9 @@
<version>1.0</version> <version>1.0</version>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<storm.version>1.2.2</storm.version> <storm.version>1.2.2</storm.version>
</properties> </properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/resources/assembly.xml</descriptor>
</descriptors>
<archive>
<manifest>
<mainClass>com.heibaiying.wordcount.ClusterWordCountApp</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<dependencies> <dependencies>
<dependency> <dependency>
@ -54,4 +27,65 @@
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<!--使用shade进行打包-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.sf</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.dsa</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.rsa</exclude>
<exclude>META-INF/*.EC</exclude>
<exclude>META-INF/*.ec</exclude>
<exclude>META-INF/MSFTSIG.SF</exclude>
<exclude>META-INF/MSFTSIG.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>org.apache.storm:storm-core</exclude>
</excludes>
</artifactSet>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -1,5 +1,6 @@
package com.heibaiying; package com.heibaiying;
import com.heibaiying.component.CountBolt;
import com.heibaiying.component.DataSourceSpout; import com.heibaiying.component.DataSourceSpout;
import com.heibaiying.component.SplitBolt; import com.heibaiying.component.SplitBolt;
import org.apache.storm.Config; import org.apache.storm.Config;
@ -18,9 +19,6 @@ import java.util.Map;
/** /**
* 进行词频统计 并将统计结果存储到HBase中 * 进行词频统计 并将统计结果存储到HBase中
* <p>
* 编译打包: mvn clean assembly:assembly -Dmaven.test.skip=true
* hdfs://hadoop001:8020/hbase
*/ */
public class WordCountToHBaseApp { public class WordCountToHBaseApp {
@ -45,11 +43,13 @@ public class WordCountToHBaseApp {
// 定义流数据与HBase中数据的映射 // 定义流数据与HBase中数据的映射
SimpleHBaseMapper mapper = new SimpleHBaseMapper() SimpleHBaseMapper mapper = new SimpleHBaseMapper()
.withRowKeyField("word") .withRowKeyField("word")
.withColumnFields(new Fields("word")) .withColumnFields(new Fields("word","count"))
.withCounterFields(new Fields("count")) .withColumnFamily("info");
.withColumnFamily("cf");
// 给HBaseBolt传入表名数据映射关系和HBase的配置信息 /*
* 给HBaseBolt传入表名数据映射关系和HBase的配置信息
* 表需要预先创建: create 'WordCount','info'
*/
HBaseBolt hbase = new HBaseBolt("WordCount", mapper) HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
.withConfigKey("hbase.conf"); .withConfigKey("hbase.conf");
@ -58,12 +58,14 @@ public class WordCountToHBaseApp {
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1); builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1);
// split // split
builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT); builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT);
// count
builder.setBolt(COUNT_BOLT, new CountBolt(),1).shuffleGrouping(SPLIT_BOLT);
// save to HBase // save to HBase
builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(SPLIT_BOLT, new Fields("word")); builder.setBolt(HBASE_BOLT, hbase, 1).shuffleGrouping(COUNT_BOLT);
// 如果外部传参cluster则代表线上环境启动,否则代表本地启动 // 如果外部传参cluster则代表线上环境启动,否则代表本地启动
if (args.length > 1 && args[1].equals("cluster")) { if (args.length > 0 && args[0].equals("cluster")) {
try { try {
StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology()); StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {

View File

@ -0,0 +1,47 @@
package com.heibaiying.component;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
/**
* 进行词频统计
*/
public class CountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
// 输出
collector.emit(new Values(word, String.valueOf(count)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

View File

@ -6,7 +6,6 @@ import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map; import java.util.Map;

View File

@ -1,25 +0,0 @@
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
<id>with-dependencies</id>
<!--指明打包方式-->
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>
<scope>runtime</scope>
<!--排除storm环境中已经提供的storm-core-->
<excludes>
<exclude>org.apache.storm:storm-core</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -43,10 +43,24 @@
<filter> <filter>
<artifact>*:*</artifact> <artifact>*:*</artifact>
<excludes> <excludes>
<exclude>org.apache.storm:storm-core</exclude> <exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.sf</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.dsa</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.rsa</exclude>
<exclude>META-INF/*.EC</exclude>
<exclude>META-INF/*.ec</exclude>
<exclude>META-INF/MSFTSIG.SF</exclude>
<exclude>META-INF/MSFTSIG.RSA</exclude>
</excludes> </excludes>
</filter> </filter>
</filters> </filters>
<artifactSet>
<excludes>
<exclude>org.apache.storm:storm-core</exclude>
</excludes>
</artifactSet>
</configuration> </configuration>
<executions> <executions>
<execution> <execution>
@ -82,6 +96,17 @@
<artifactId>storm-hdfs</artifactId> <artifactId>storm-hdfs</artifactId>
<version>${storm.version}</version> <version>${storm.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId> <artifactId>hadoop-client</artifactId>

View File

@ -20,18 +20,19 @@ import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.TopologyBuilder;
/** /**
* 进行词频统计 并将统计结果存储到HDFS中 * 将样本数据存储到HDFS中
* <p>
* hdfs://hadoopp001:8020 path
*/ */
public class WordCountToHdfsApp { public class DataToHdfsApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout"; private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String HDFS_BOLT = "hdfsBolt"; private static final String HDFS_BOLT = "hdfsBolt";
public static void main(String[] args) { public static void main(String[] args) {
// 定义存储文本的分隔符 // 指定Hadoop的用户名 如果不指定,则在HDFS创建目录时候有可能抛出无权限的异常(RemoteException: Permission denied)
System.setProperty("HADOOP_USER_NAME", "root");
// 定义输出字段(Field)之间的分隔符
RecordFormat format = new DelimitedRecordFormat() RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|"); .withFieldDelimiter("|");
@ -41,7 +42,7 @@ public class WordCountToHdfsApp {
// 文件策略: 每个文件大小上限1M,超过限定时,创建新文件并继续写入 // 文件策略: 每个文件大小上限1M,超过限定时,创建新文件并继续写入
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB); FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB);
// 定义完整路径 // 定义存储路径
FileNameFormat fileNameFormat = new DefaultFileNameFormat() FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm-hdfs/"); .withPath("/storm-hdfs/");
@ -57,20 +58,20 @@ public class WordCountToHdfsApp {
// 构建Topology // 构建Topology
TopologyBuilder builder = new TopologyBuilder(); TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout()); builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
// save to HBase // save to HDFS
builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT); builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT);
// 如果外部传参cluster则代表线上环境启动,否则代表本地启动 // 如果外部传参cluster则代表线上环境启动,否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) { if (args.length > 0 && args[0].equals("cluster")) {
try { try {
StormSubmitter.submitTopology("ClusterWordCountToHdfsApp", new Config(), builder.createTopology()); StormSubmitter.submitTopology("ClusterDataToHdfsApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace(); e.printStackTrace();
} }
} else { } else {
LocalCluster cluster = new LocalCluster(); LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountToHdfsApp", cluster.submitTopology("LocalDataToHdfsApp",
new Config(), builder.createTopology()); new Config(), builder.createTopology());
} }
} }

View File

@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.heibaiying</groupId>
<artifactId>storm-kafka-integration</artifactId>
<version>1.0</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<!--使用shade进行打包-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.sf</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.dsa</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.rsa</exclude>
<exclude>META-INF/*.EC</exclude>
<exclude>META-INF/*.ec</exclude>
<exclude>META-INF/MSFTSIG.SF</exclude>
<exclude>META-INF/MSFTSIG.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>org.apache.storm:storm-core</exclude>
</excludes>
</artifactSet>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<storm.version>1.2.2</storm.version>
<kafka.version>2.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,40 @@
package com.heibaiying.kafka.read;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import java.util.Map;
/**
* 打印从Kafka中获取的数据
*/
public class LogConsoleBolt extends BaseRichBolt {
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
public void execute(Tuple input) {
try {
String value = input.getStringByField("value");
System.out.println("received from kafka : "+ value);
// 必须ack,否则会重复消费kafka中的消息
collector.ack(input);
}catch (Exception e){
e.printStackTrace();
collector.fail(input);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}

View File

@ -0,0 +1,61 @@
package com.heibaiying.kafka.read;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.topology.TopologyBuilder;
/**
* 从Kafka中读取数据
*/
public class ReadingFromKafkaApp {
private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";
private static final String TOPIC_NAME = "storm-topic";
public static void main(String[] args) {
final TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(BOOTSTRAP_SERVERS, TOPIC_NAME)), 1);
builder.setBolt("bolt", new LogConsoleBolt()).shuffleGrouping("kafka_spout");
// 如果外部传参cluster则代表线上环境启动,否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterReadingFromKafkaApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalReadingFromKafkaApp",
new Config(), builder.createTopology());
}
}
private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) {
return KafkaSpoutConfig.builder(bootstrapServers, topic)
// 除了分组ID,以下配置都是可选的分组ID必须指定,否则会抛出InvalidGroupIdException异常
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
// 定义重试策略
.setRetry(getRetryService())
// 定时提交偏移量的时间间隔,默认是15s
.setOffsetCommitPeriodMs(10_000)
.build();
}
// 定义重试策略
private static KafkaSpoutRetryService getRetryService() {
return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
}
}

View File

@ -0,0 +1,52 @@
package com.heibaiying.kafka.write;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.*;
/**
* 产生词频样本的数据源
*/
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
// 模拟产生数据
String lineData = productData();
spoutOutputCollector.emit(new Values("key",lineData));
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare( new Fields("key", "message"));
}
/**
* 模拟数据
*/
private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}
}

View File

@ -0,0 +1,67 @@
package com.heibaiying.kafka.write;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.topology.TopologyBuilder;
import java.util.Properties;
/**
* 写入数据到Kafka的特定主题中
*/
public class WritingToKafkaApp {
private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";
private static final String TOPIC_NAME = "storm-topic";
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
// 定义Kafka生产者属性
Properties props = new Properties();
/*
* 指定broker的地址清单清单里不需要包含所有的broker地址生产者会从给定的broker里查找其他broker的信息
* 不过建议至少要提供两个broker的信息作为容错
*/
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
/*
* acks 参数指定了必须要有多少个分区副本收到消息生产者才会认为消息写入是成功的
* acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应
* acks=1 : 只要集群的首领节点收到消息生产者就会收到一个来自服务器成功响应
* acks=all : 只有当所有参与复制的节点全部收到消息时生产者才会收到一个来自服务器的成功响应
*/
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaBolt bolt = new KafkaBolt<String, String>()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector(TOPIC_NAME))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>());
builder.setSpout("sourceSpout", new DataSourceSpout(), 1);
builder.setBolt("kafkaBolt", bolt, 1).shuffleGrouping("sourceSpout");
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterWritingToKafkaApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWritingToKafkaApp",
new Config(), builder.createTopology());
}
}
}

View File

@ -23,18 +23,52 @@
<target>8</target> <target>8</target>
</configuration> </configuration>
</plugin> </plugin>
<!--使用shade进行打包-->
<plugin> <plugin>
<artifactId>maven-assembly-plugin</artifactId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration> <configuration>
<descriptors> <createDependencyReducedPom>true</createDependencyReducedPom>
<descriptor>src/main/resources/assembly.xml</descriptor> <filters>
</descriptors> <filter>
<archive> <artifact>*:*</artifact>
<manifest> <excludes>
<mainClass>com.heibaiying.wordcount.ClusterWordCountApp</mainClass> <exclude>META-INF/*.SF</exclude>
</manifest> <exclude>META-INF/*.sf</exclude>
</archive> <exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.dsa</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.rsa</exclude>
<exclude>META-INF/*.EC</exclude>
<exclude>META-INF/*.ec</exclude>
<exclude>META-INF/MSFTSIG.SF</exclude>
<exclude>META-INF/MSFTSIG.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>org.apache.storm:storm-core</exclude>
</excludes>
</artifactSet>
</configuration> </configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>

View File

@ -17,10 +17,6 @@ import org.apache.storm.topology.TopologyBuilder;
/** /**
* 进行词频统计 并将统计结果存储到Redis中 * 进行词频统计 并将统计结果存储到Redis中
* <p>
* 编译打包: mvn clean assembly:assembly -Dmaven.test.skip=true
* 提交Topology到集群: storm jar /usr/appjar/storm-redis-integration-1.0-with-dependencies.jar com.heibaiying.WordCountToRedisApp cluster
* 停止Topology: storm kill ClusterWordCountApp -w 3
*/ */
public class WordCountToRedisApp { public class WordCountToRedisApp {

View File

@ -1,25 +0,0 @@
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
<id>with-dependencies</id>
<!--指明打包方式-->
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>
<scope>runtime</scope>
<!--排除storm环境中已经提供的storm-core-->
<excludes>
<exclude>org.apache.storm:storm-core</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 52 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB