modify
This commit is contained in:
		| @@ -12,6 +12,10 @@ import org.apache.storm.redis.common.config.JedisPoolConfig; | |||||||
| import org.apache.storm.redis.common.mapper.RedisStoreMapper; | import org.apache.storm.redis.common.mapper.RedisStoreMapper; | ||||||
| import org.apache.storm.topology.TopologyBuilder; | import org.apache.storm.topology.TopologyBuilder; | ||||||
|  |  | ||||||
|  |  | ||||||
|  | /** | ||||||
|  |  * 利用自定义的RedisBolt实现词频统计 | ||||||
|  |  */ | ||||||
| public class CustomRedisCountApp { | public class CustomRedisCountApp { | ||||||
|  |  | ||||||
|     private static final String REDIS_HOST = "192.168.200.226"; |     private static final String REDIS_HOST = "192.168.200.226"; | ||||||
|   | |||||||
| @@ -11,6 +11,9 @@ import org.apache.storm.tuple.Values; | |||||||
| import java.util.HashMap; | import java.util.HashMap; | ||||||
| import java.util.Map; | import java.util.Map; | ||||||
|  |  | ||||||
|  | /** | ||||||
|  |  * 进行词频统计 | ||||||
|  |  */ | ||||||
| public class CountBolt extends BaseRichBolt { | public class CountBolt extends BaseRichBolt { | ||||||
|  |  | ||||||
|     private Map<String, Integer> counts = new HashMap<>(); |     private Map<String, Integer> counts = new HashMap<>(); | ||||||
|   | |||||||
| @@ -11,6 +11,10 @@ import org.apache.storm.utils.Utils; | |||||||
|  |  | ||||||
| import java.util.*; | import java.util.*; | ||||||
|  |  | ||||||
|  |  | ||||||
|  | /** | ||||||
|  |  * 产生词频样本的数据源 | ||||||
|  |  */ | ||||||
| public class DataSourceSpout extends BaseRichSpout { | public class DataSourceSpout extends BaseRichSpout { | ||||||
|  |  | ||||||
|     private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); |     private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); | ||||||
|   | |||||||
| @@ -8,6 +8,10 @@ import org.apache.storm.topology.OutputFieldsDeclarer; | |||||||
| import org.apache.storm.tuple.Tuple; | import org.apache.storm.tuple.Tuple; | ||||||
| import redis.clients.jedis.JedisCommands; | import redis.clients.jedis.JedisCommands; | ||||||
|  |  | ||||||
|  |  | ||||||
|  | /** | ||||||
|  |  * 自定义RedisBolt 利用Redis的哈希数据结构的hincrby key field功能进行词频统计 | ||||||
|  |  */ | ||||||
| public class RedisCountStoreBolt extends AbstractRedisBolt { | public class RedisCountStoreBolt extends AbstractRedisBolt { | ||||||
|  |  | ||||||
|     private final RedisStoreMapper storeMapper; |     private final RedisStoreMapper storeMapper; | ||||||
|   | |||||||
| @@ -10,6 +10,9 @@ import org.apache.storm.tuple.Values; | |||||||
|  |  | ||||||
| import java.util.Map; | import java.util.Map; | ||||||
|  |  | ||||||
|  | /** | ||||||
|  |  * 将每行数据按照指定分隔符进行拆分 | ||||||
|  |  */ | ||||||
| public class SplitBolt extends BaseRichBolt { | public class SplitBolt extends BaseRichBolt { | ||||||
|  |  | ||||||
|     private OutputCollector collector; |     private OutputCollector collector; | ||||||
|   | |||||||
| @@ -4,6 +4,9 @@ import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; | |||||||
| import org.apache.storm.redis.common.mapper.RedisStoreMapper; | import org.apache.storm.redis.common.mapper.RedisStoreMapper; | ||||||
| import org.apache.storm.tuple.ITuple; | import org.apache.storm.tuple.ITuple; | ||||||
|  |  | ||||||
|  | /** | ||||||
|  |  * 定义流数据与Redis中数据的映射关系 | ||||||
|  |  */ | ||||||
| public class  WordCountStoreMapper implements RedisStoreMapper { | public class  WordCountStoreMapper implements RedisStoreMapper { | ||||||
|     private RedisDataTypeDescription description; |     private RedisDataTypeDescription description; | ||||||
|     private final String hashKey = "wordCount"; |     private final String hashKey = "wordCount"; | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user