Update Hadoop-MapReduce.md
This commit is contained in:
parent
c64fe66913
commit
c3e8b9e6d5
@ -82,11 +82,11 @@ combiner是map运算后的可选操作,其实际上是一个本地化的reduce
|
|||||||
|
|
||||||
不使用combiner的情况:
|
不使用combiner的情况:
|
||||||
|
|
||||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/mapreduce-without-combiners.png"/> </div>
|
<div align="center"> <img width="600px" src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/mapreduce-without-combiners.png"/> </div>
|
||||||
|
|
||||||
使用combiner的情况:
|
使用combiner的情况:
|
||||||
|
|
||||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/mapreduce-with-combiners.png"/> </div>
|
<div align="center"> <img width="600px" src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/mapreduce-with-combiners.png"/> </div>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -112,7 +112,7 @@ Merge是怎样的?如“aaa”从某个map task读取过来时值是5,从另
|
|||||||
|
|
||||||
### 4.1 项目简介
|
### 4.1 项目简介
|
||||||
|
|
||||||
这里给出一个经典的案例:词频统计。统计如下样本数据中每个单词出现的次数。
|
这里给出一个经典的案例:词频统计。统计如下样本数据中每个单词出现的次数。
|
||||||
|
|
||||||
```properties
|
```properties
|
||||||
Spark HBase
|
Spark HBase
|
||||||
@ -132,9 +132,6 @@ HBase Hive
|
|||||||
|
|
||||||
为方便大家开发,我在项目源码中放置了一个工具类`WordCountDataUtils`,用于产生词频统计样本文件:
|
为方便大家开发,我在项目源码中放置了一个工具类`WordCountDataUtils`,用于产生词频统计样本文件:
|
||||||
|
|
||||||
+ 支持产生样本文件到本地,适用于本地测试;
|
|
||||||
+ 支持产生样本文件并直接输出到HDFS,适用于提交到服务器测试;
|
|
||||||
|
|
||||||
> 本篇文章所有源码下载地址:[hadoop-word-count](https://github.com/heibaiying/BigData-Notes/tree/master/code/Hadoop/hadoop-word-count)
|
> 本篇文章所有源码下载地址:[hadoop-word-count](https://github.com/heibaiying/BigData-Notes/tree/master/code/Hadoop/hadoop-word-count)
|
||||||
|
|
||||||
```java
|
```java
|
||||||
@ -143,7 +140,8 @@ HBase Hive
|
|||||||
*/
|
*/
|
||||||
public class WordCountDataUtils {
|
public class WordCountDataUtils {
|
||||||
|
|
||||||
public static final List<String> WORD_LIST = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
|
public static final List<String> WORD_LIST = Arrays.asList("Spark", "Hadoop", "HBase",
|
||||||
|
"Storm", "Flink", "Hive");
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -215,8 +213,6 @@ public class WordCountDataUtils {
|
|||||||
|
|
||||||
### 4.2 WordCountMapper
|
### 4.2 WordCountMapper
|
||||||
|
|
||||||
**Mapper代码实现**:
|
|
||||||
|
|
||||||
```java
|
```java
|
||||||
/**
|
/**
|
||||||
* 将每行数据按照指定分隔符进行拆分
|
* 将每行数据按照指定分隔符进行拆分
|
||||||
@ -224,7 +220,8 @@ public class WordCountDataUtils {
|
|||||||
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
|
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
|
protected void map(LongWritable key, Text value, Context context) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
String[] words = value.toString().split("\t");
|
String[] words = value.toString().split("\t");
|
||||||
for (String word : words) {
|
for (String word : words) {
|
||||||
context.write(new Text(word), new IntWritable(1));
|
context.write(new Text(word), new IntWritable(1));
|
||||||
@ -236,7 +233,7 @@ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritabl
|
|||||||
|
|
||||||
**代码说明**:
|
**代码说明**:
|
||||||
|
|
||||||
Splitting操作已由由Hadoop程序帮我们完成的,WordCountMapper对于下图的Mapping操作,这里WordCountMapper继承自Mapper类,这是一个泛型类,定义如下:
|
WordCountMapper对应下图的Mapping操作,这里WordCountMapper继承自Mapper类,这是一个泛型类,定义如下:
|
||||||
|
|
||||||
```java
|
```java
|
||||||
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
|
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
|
||||||
@ -244,10 +241,10 @@ public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
+ KEYIN : mapping输入的key的数据类型,即每行的偏移量(每行第一个字符在文本中的位置),Long类型,对应Hadoop中的LongWritable类型;
|
+ **KEYIN** : mapping输入的key的数据类型,即每行的偏移量(每行第一个字符在文本中的位置),Long类型,对应Hadoop中的LongWritable类型;
|
||||||
+ VALUEIN : mappin输入的value的数据类型,即每行数据;String类型,对应Hadoop中Text类型;
|
+ **VALUEIN** : mappin输入的value的数据类型,即每行数据;String类型,对应Hadoop中Text类型;
|
||||||
+ KEYOUT :mapping输出的key的数据类型,即每个单词;String类型,对应Hadoop中Text类型;
|
+ **KEYOUT** :mapping输出的key的数据类型,即每个单词;String类型,对应Hadoop中Text类型;
|
||||||
+ VALUEOUT:mapping输出的value的数据类型,即每个单词出现的次数;这里用int类型,对应Hadoop中IntWritable类型;
|
+ **VALUEOUT**:mapping输出的value的数据类型,即每个单词出现的次数;这里用int类型,对应Hadoop中IntWritable类型;
|
||||||
|
|
||||||
在MapReduce中必须使用Hadoop定义的类型,因为Hadoop预定义的类型都是可序列化,可比较的,所有类型均实现了`WritableComparable`接口。
|
在MapReduce中必须使用Hadoop定义的类型,因为Hadoop预定义的类型都是可序列化,可比较的,所有类型均实现了`WritableComparable`接口。
|
||||||
|
|
||||||
@ -255,8 +252,6 @@ public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
|
|||||||
|
|
||||||
### 4.3 WordCountReducer
|
### 4.3 WordCountReducer
|
||||||
|
|
||||||
**Reducer代码实现**:
|
|
||||||
|
|
||||||
```java
|
```java
|
||||||
/**
|
/**
|
||||||
* 进行词频统计
|
* 进行词频统计
|
||||||
@ -264,7 +259,8 @@ public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
|
|||||||
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
|
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
|
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (IntWritable value : values) {
|
for (IntWritable value : values) {
|
||||||
count += value.get();
|
count += value.get();
|
||||||
@ -276,7 +272,7 @@ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritab
|
|||||||
|
|
||||||
**代码说明**:
|
**代码说明**:
|
||||||
|
|
||||||
这里的key显然就是每个单词,这里的values是一个可迭代的数据类型,因为shuffling输出的数据实际上是下图中所示的这样的,即`key,(1,1,1,1,1,1,1,.....)`。values是可迭代的。
|
这里的key是每个单词,这里的values是一个可迭代的数据类型,因为shuffling输出的数据实际上是下图中所示的这样的,即`key,(1,1,1,1,1,1,1,.....)`。
|
||||||
|
|
||||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/hadoop-code-reducer.png"/> </div>
|
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/hadoop-code-reducer.png"/> </div>
|
||||||
|
|
||||||
@ -389,9 +385,9 @@ hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000
|
|||||||
|
|
||||||
## 五、词频统计案例进阶
|
## 五、词频统计案例进阶
|
||||||
|
|
||||||
### 5.1 combiner
|
## 5.1 combiner
|
||||||
|
|
||||||
#### 1. combiner的代码实现
|
### 1. combiner的代码实现
|
||||||
|
|
||||||
combiner的代码实现比较简单,只要在组装作业时,添加下面一行代码即可
|
combiner的代码实现比较简单,只要在组装作业时,添加下面一行代码即可
|
||||||
|
|
||||||
@ -400,7 +396,7 @@ combiner的代码实现比较简单,只要在组装作业时,添加下面一
|
|||||||
job.setCombinerClass(WordCountReducer.class);
|
job.setCombinerClass(WordCountReducer.class);
|
||||||
```
|
```
|
||||||
|
|
||||||
#### 2. 测试结果
|
### 2. 测试结果
|
||||||
|
|
||||||
加入combiner后统计结果是不会有变化的,但是我们可以从打印的日志看出combiner的效果:
|
加入combiner后统计结果是不会有变化的,但是我们可以从打印的日志看出combiner的效果:
|
||||||
|
|
||||||
@ -414,9 +410,9 @@ job.setCombinerClass(WordCountReducer.class);
|
|||||||
|
|
||||||
这里我们只有一个输入文件并且小于128M,所以只有一个Map进行处理,可以看到经过combiner后,records由3519降低为6(样本中单词种类就只有6个),这一点从图中日志的`reduce input records`参数也可以看出来。在这个用例中combiner的效果就非常明显。
|
这里我们只有一个输入文件并且小于128M,所以只有一个Map进行处理,可以看到经过combiner后,records由3519降低为6(样本中单词种类就只有6个),这一点从图中日志的`reduce input records`参数也可以看出来。在这个用例中combiner的效果就非常明显。
|
||||||
|
|
||||||
### 5.2 Partitioner
|
## 5.2 Partitioner
|
||||||
|
|
||||||
#### 1. 默认Partitioner规则
|
### 1. 默认Partitioner规则
|
||||||
|
|
||||||
这里假设有个需求:将不同单词的统计结果输出到不同文件。这种需求实际上比较常见,比如统计产品的销量时,需要将结果按照产品分类输出。
|
这里假设有个需求:将不同单词的统计结果输出到不同文件。这种需求实际上比较常见,比如统计产品的销量时,需要将结果按照产品分类输出。
|
||||||
|
|
||||||
@ -435,7 +431,7 @@ public class HashPartitioner<K, V> extends Partitioner<K, V> {
|
|||||||
|
|
||||||
对key进行哈希散列并对`numReduceTasks`取余,这里由于`numReduceTasks`默认值为1,所以我们之前的统计结果都输出到同一个文件中。
|
对key进行哈希散列并对`numReduceTasks`取余,这里由于`numReduceTasks`默认值为1,所以我们之前的统计结果都输出到同一个文件中。
|
||||||
|
|
||||||
#### 2. 自定义Partitioner
|
### 2. 自定义Partitioner
|
||||||
|
|
||||||
这里我们继承`Partitioner`自定义分区规则,这里按照单词进行分区:
|
这里我们继承`Partitioner`自定义分区规则,这里按照单词进行分区:
|
||||||
|
|
||||||
@ -462,7 +458,7 @@ job.setNumReduceTasks(WordCountDataUtils.WORD_LIST.size());
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
#### 3. 测试结果
|
### 3. 测试结果
|
||||||
|
|
||||||
测试结果如下,分别生成6个文件,每个文件中为对应单词的统计结果。
|
测试结果如下,分别生成6个文件,每个文件中为对应单词的统计结果。
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user