strom整合其他框架

This commit is contained in:
罗祥
2019-04-17 17:55:19 +08:00
parent a339956c1a
commit d4daa775cf
13 changed files with 517 additions and 14 deletions

View File

@ -0,0 +1,77 @@
package com.heibaiying;
import com.heibaiying.component.DataSourceSpout;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.topology.TopologyBuilder;
/**
* 进行词频统计 并将统计结果存储到HDFS中
* <p>
* hdfs://hadoopp001:8020 path
*/
public class WordCountToHdfsApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String HDFS_BOLT = "hdfsBolt";
public static void main(String[] args) {
// 定义存储文本的分隔符
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// 同步策略: 每100个tuples之后就会把数据从缓存刷新到HDFS中
SyncPolicy syncPolicy = new CountSyncPolicy(100);
// 文件策略: 每个文件大小上限1M,超过限定时,创建新文件并继续写入
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB);
// 定义完整路径
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm-hdfs/");
// 定义HdfsBolt
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://hadoop001:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
// 构建Topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
// save to HBase
builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT);
// 如果外部传参cluster则代表线上环境启动,否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterWordCountToHdfsApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountToHdfsApp",
new Config(), builder.createTopology());
}
}
}

View File

@ -0,0 +1,53 @@
package com.heibaiying.component;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.*;
/**
* 产生词频样本的数据源
*/
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
// 模拟产生数据
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
/**
* 模拟数据
*/
private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}
}