From c3e8b9e6d5062b7fd5e914c92c744475ff52883e Mon Sep 17 00:00:00 2001 From: heibaiying <31504331+heibaiying@users.noreply.github.com> Date: Mon, 22 Apr 2019 22:32:30 +0800 Subject: [PATCH] Update Hadoop-MapReduce.md --- notes/Hadoop-MapReduce.md | 48 ++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/notes/Hadoop-MapReduce.md b/notes/Hadoop-MapReduce.md index 10b0e94..f329682 100644 --- a/notes/Hadoop-MapReduce.md +++ b/notes/Hadoop-MapReduce.md @@ -82,11 +82,11 @@ combiner是map运算后的可选操作,其实际上是一个本地化的reduce 不使用combiner的情况: -
+
使用combiner的情况: -
+
@@ -112,7 +112,7 @@ Merge是怎样的?如“aaa”从某个map task读取过来时值是5,从另 ### 4.1 项目简介 -这里给出一个经典的案例:词频统计。统计如下样本数据中每个单词出现的次数。 +这里给出一个经典的案例:词频统计。统计如下样本数据中每个单词出现的次数。 ```properties Spark HBase @@ -132,9 +132,6 @@ HBase Hive 为方便大家开发,我在项目源码中放置了一个工具类`WordCountDataUtils`,用于产生词频统计样本文件: -+ 支持产生样本文件到本地,适用于本地测试; -+ 支持产生样本文件并直接输出到HDFS,适用于提交到服务器测试; - > 本篇文章所有源码下载地址:[hadoop-word-count](https://github.com/heibaiying/BigData-Notes/tree/master/code/Hadoop/hadoop-word-count) ```java @@ -143,7 +140,8 @@ HBase Hive */ public class WordCountDataUtils { - public static final List WORD_LIST = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); + public static final List WORD_LIST = Arrays.asList("Spark", "Hadoop", "HBase", + "Storm", "Flink", "Hive"); /** @@ -215,8 +213,6 @@ public class WordCountDataUtils { ### 4.2 WordCountMapper -**Mapper代码实现**: - ```java /** * 将每行数据按照指定分隔符进行拆分 @@ -224,7 +220,8 @@ public class WordCountDataUtils { public class WordCountMapper extends Mapper { @Override - protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + protected void map(LongWritable key, Text value, Context context) throws IOException, + InterruptedException { String[] words = value.toString().split("\t"); for (String word : words) { context.write(new Text(word), new IntWritable(1)); @@ -236,7 +233,7 @@ public class WordCountMapper extends Mapper { @@ -244,10 +241,10 @@ public class Mapper { } ``` -+ KEYIN : mapping输入的key的数据类型,即每行的偏移量(每行第一个字符在文本中的位置),Long类型,对应Hadoop中的LongWritable类型; -+ VALUEIN : mappin输入的value的数据类型,即每行数据;String类型,对应Hadoop中Text类型; -+ KEYOUT :mapping输出的key的数据类型,即每个单词;String类型,对应Hadoop中Text类型; -+ VALUEOUT:mapping输出的value的数据类型,即每个单词出现的次数;这里用int类型,对应Hadoop中IntWritable类型; ++ **KEYIN** : mapping输入的key的数据类型,即每行的偏移量(每行第一个字符在文本中的位置),Long类型,对应Hadoop中的LongWritable类型; ++ **VALUEIN** : mappin输入的value的数据类型,即每行数据;String类型,对应Hadoop中Text类型; ++ **KEYOUT** :mapping输出的key的数据类型,即每个单词;String类型,对应Hadoop中Text类型; ++ **VALUEOUT**:mapping输出的value的数据类型,即每个单词出现的次数;这里用int类型,对应Hadoop中IntWritable类型; 在MapReduce中必须使用Hadoop定义的类型,因为Hadoop预定义的类型都是可序列化,可比较的,所有类型均实现了`WritableComparable`接口。 @@ -255,8 +252,6 @@ public class Mapper { ### 4.3 WordCountReducer -**Reducer代码实现**: - ```java /** * 进行词频统计 @@ -264,7 +259,8 @@ public class Mapper { public class WordCountReducer extends Reducer { @Override - protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + protected void reduce(Text key, Iterable values, Context context) throws IOException, + InterruptedException { int count = 0; for (IntWritable value : values) { count += value.get(); @@ -276,7 +272,7 @@ public class WordCountReducer extends Reducer @@ -389,9 +385,9 @@ hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000 ## 五、词频统计案例进阶 -### 5.1 combiner +## 5.1 combiner -#### 1. combiner的代码实现 +### 1. combiner的代码实现 combiner的代码实现比较简单,只要在组装作业时,添加下面一行代码即可 @@ -400,7 +396,7 @@ combiner的代码实现比较简单,只要在组装作业时,添加下面一 job.setCombinerClass(WordCountReducer.class); ``` -#### 2. 测试结果 +### 2. 测试结果 加入combiner后统计结果是不会有变化的,但是我们可以从打印的日志看出combiner的效果: @@ -414,9 +410,9 @@ job.setCombinerClass(WordCountReducer.class); 这里我们只有一个输入文件并且小于128M,所以只有一个Map进行处理,可以看到经过combiner后,records由3519降低为6(样本中单词种类就只有6个),这一点从图中日志的`reduce input records`参数也可以看出来。在这个用例中combiner的效果就非常明显。 -### 5.2 Partitioner +## 5.2 Partitioner -#### 1. 默认Partitioner规则 +### 1. 默认Partitioner规则 这里假设有个需求:将不同单词的统计结果输出到不同文件。这种需求实际上比较常见,比如统计产品的销量时,需要将结果按照产品分类输出。 @@ -435,7 +431,7 @@ public class HashPartitioner extends Partitioner { 对key进行哈希散列并对`numReduceTasks`取余,这里由于`numReduceTasks`默认值为1,所以我们之前的统计结果都输出到同一个文件中。 -#### 2. 自定义Partitioner +### 2. 自定义Partitioner 这里我们继承`Partitioner`自定义分区规则,这里按照单词进行分区: @@ -462,7 +458,7 @@ job.setNumReduceTasks(WordCountDataUtils.WORD_LIST.size()); -#### 3. 测试结果 +### 3. 测试结果 测试结果如下,分别生成6个文件,每个文件中为对应单词的统计结果。