mapreduce
This commit is contained in:
		| @@ -0,0 +1,84 @@ | ||||
| package com.heibaiying; | ||||
|  | ||||
| import com.heibaiying.component.WordCountMapper; | ||||
| import com.heibaiying.component.WordCountReducer; | ||||
| import org.apache.hadoop.conf.Configuration; | ||||
| import org.apache.hadoop.fs.FileSystem; | ||||
| import org.apache.hadoop.fs.Path; | ||||
| import org.apache.hadoop.io.IntWritable; | ||||
| import org.apache.hadoop.io.Text; | ||||
| import org.apache.hadoop.mapreduce.Job; | ||||
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | ||||
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | ||||
|  | ||||
| import java.net.URI; | ||||
|  | ||||
| /** | ||||
|  * 组装作业 并提交到集群运行 | ||||
|  */ | ||||
| public class WordCountApp { | ||||
|  | ||||
|  | ||||
|     // 这里为了直观显示参数 使用了硬编码,实际开发中可以通过外部传参 | ||||
|     private static final String HDFS_URL = "hdfs://192.168.0.107:8020"; | ||||
|     private static final String HADOOP_USER_NAME = "root"; | ||||
|  | ||||
|     public static void main(String[] args) throws Exception { | ||||
|  | ||||
|  | ||||
|         //  文件输入路径和输出路径由外部传参指定 | ||||
|         if (args.length < 2) { | ||||
|             System.out.println("Input and output paths are necessary!"); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         // 需要指明hadoop用户名,否则在HDFS上创建目录时可能会抛出权限不足的异常 | ||||
|         System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME); | ||||
|  | ||||
|  | ||||
|         Configuration configuration = new Configuration(); | ||||
|         // 指明HDFS的地址 | ||||
|         configuration.set("fs.defaultFS", HDFS_URL); | ||||
|  | ||||
|  | ||||
|         // 创建一个Job | ||||
|         Job job = Job.getInstance(configuration); | ||||
|  | ||||
|         // 设置运行的主类 | ||||
|         job.setJarByClass(WordCountApp.class); | ||||
|  | ||||
|         // 设置Mapper和Reducer | ||||
|         job.setMapperClass(WordCountMapper.class); | ||||
|         job.setReducerClass(WordCountReducer.class); | ||||
|  | ||||
|         // 设置Mapper输出key和value的类型 | ||||
|         job.setMapOutputKeyClass(Text.class); | ||||
|         job.setMapOutputValueClass(IntWritable.class); | ||||
|  | ||||
|         // 设置Reducer输出key和value的类型 | ||||
|         job.setOutputKeyClass(Text.class); | ||||
|         job.setOutputValueClass(IntWritable.class); | ||||
|  | ||||
|         // 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常 | ||||
|         FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME); | ||||
|         Path outputPath = new Path(args[1]); | ||||
|         if (fileSystem.exists(outputPath)) { | ||||
|             fileSystem.delete(outputPath, true); | ||||
|         } | ||||
|  | ||||
|  | ||||
|         // 设置作业输入文件和输出文件的路径 | ||||
|         FileInputFormat.setInputPaths(job, new Path(args[0])); | ||||
|         FileOutputFormat.setOutputPath(job, outputPath); | ||||
|  | ||||
|         // 将作业提交到群集并等待它完成,参数设置为true代表打印显示对应的进度 | ||||
|         boolean result = job.waitForCompletion(true); | ||||
|  | ||||
|         // 关闭之前创建的fileSystem | ||||
|         fileSystem.close(); | ||||
|  | ||||
|         // 根据作业结果,终止当前运行的Java虚拟机,退出程序 | ||||
|         System.exit(result ? 0 : -1); | ||||
|  | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,87 @@ | ||||
| package com.heibaiying; | ||||
|  | ||||
| import com.heibaiying.component.WordCountMapper; | ||||
| import com.heibaiying.component.WordCountReducer; | ||||
| import org.apache.hadoop.conf.Configuration; | ||||
| import org.apache.hadoop.fs.FileSystem; | ||||
| import org.apache.hadoop.fs.Path; | ||||
| import org.apache.hadoop.io.IntWritable; | ||||
| import org.apache.hadoop.io.Text; | ||||
| import org.apache.hadoop.mapreduce.Job; | ||||
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | ||||
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | ||||
|  | ||||
| import java.net.URI; | ||||
|  | ||||
| /** | ||||
|  * 组装作业 并提交到集群运行 | ||||
|  */ | ||||
| public class WordCountCombinerApp { | ||||
|  | ||||
|  | ||||
|     // 这里为了直观显示参数 使用了硬编码的形式,实际开发中可以通过外部传参 | ||||
|     private static final String HDFS_URL = "hdfs://192.168.0.107:8020"; | ||||
|     private static final String HADOOP_USER_NAME = "root"; | ||||
|  | ||||
|     public static void main(String[] args) throws Exception { | ||||
|  | ||||
|  | ||||
|         //  文件输入路径和输出路径由外部传参指定 | ||||
|         if (args.length < 2) { | ||||
|             System.out.println("Input and output paths are necessary!"); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         // 需要指明hadoop用户名,否则在HDFS上创建目录时可能会抛出权限不足的异常 | ||||
|         System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME); | ||||
|  | ||||
|  | ||||
|         Configuration configuration = new Configuration(); | ||||
|         // 指明HDFS的地址 | ||||
|         configuration.set("fs.defaultFS", HDFS_URL); | ||||
|  | ||||
|  | ||||
|         // 创建一个Job | ||||
|         Job job = Job.getInstance(configuration); | ||||
|  | ||||
|         // 设置运行的主类 | ||||
|         job.setJarByClass(WordCountCombinerApp.class); | ||||
|  | ||||
|         // 设置Mapper和Reducer | ||||
|         job.setMapperClass(WordCountMapper.class); | ||||
|         job.setReducerClass(WordCountReducer.class); | ||||
|  | ||||
|         // 设置Combiner | ||||
|         job.setCombinerClass(WordCountReducer.class); | ||||
|  | ||||
|         // 设置Mapper输出key和value的类型 | ||||
|         job.setMapOutputKeyClass(Text.class); | ||||
|         job.setMapOutputValueClass(IntWritable.class); | ||||
|  | ||||
|         // 设置Reducer输出key和value的类型 | ||||
|         job.setOutputKeyClass(Text.class); | ||||
|         job.setOutputValueClass(IntWritable.class); | ||||
|  | ||||
|         // 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常 | ||||
|         FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME); | ||||
|         Path outputPath = new Path(args[1]); | ||||
|         if (fileSystem.exists(outputPath)) { | ||||
|             fileSystem.delete(outputPath, true); | ||||
|         } | ||||
|  | ||||
|  | ||||
|         // 设置作业输入文件和输出文件的路径 | ||||
|         FileInputFormat.setInputPaths(job, new Path(args[0])); | ||||
|         FileOutputFormat.setOutputPath(job, outputPath); | ||||
|  | ||||
|         // 将作业提交到群集并等待它完成,参数设置为true代表打印显示对应的进度 | ||||
|         boolean result = job.waitForCompletion(true); | ||||
|  | ||||
|         // 关闭之前创建的fileSystem | ||||
|         fileSystem.close(); | ||||
|  | ||||
|         // 根据作业结果,终止当前运行的Java虚拟机,退出程序 | ||||
|         System.exit(result ? 0 : -1); | ||||
|  | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,95 @@ | ||||
| package com.heibaiying; | ||||
|  | ||||
| import com.heibaiying.component.CustomPartitioner; | ||||
| import com.heibaiying.component.WordCountMapper; | ||||
| import com.heibaiying.component.WordCountReducer; | ||||
| import com.heibaiying.utils.WordCountDataUtils; | ||||
| import org.apache.hadoop.conf.Configuration; | ||||
| import org.apache.hadoop.fs.FileSystem; | ||||
| import org.apache.hadoop.fs.Path; | ||||
| import org.apache.hadoop.io.IntWritable; | ||||
| import org.apache.hadoop.io.Text; | ||||
| import org.apache.hadoop.mapreduce.Job; | ||||
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | ||||
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | ||||
|  | ||||
| import java.net.URI; | ||||
|  | ||||
| /** | ||||
|  * 组装作业 并提交到集群运行 | ||||
|  */ | ||||
| public class WordCountCombinerPartitionerApp { | ||||
|  | ||||
|  | ||||
|     // 这里为了直观显示参数 使用了硬编码的形式,实际开发中可以通过外部传参 | ||||
|     private static final String HDFS_URL = "hdfs://192.168.0.107:8020"; | ||||
|     private static final String HADOOP_USER_NAME = "root"; | ||||
|  | ||||
|     public static void main(String[] args) throws Exception { | ||||
|  | ||||
|  | ||||
|         //  文件输入路径和输出路径由外部传参指定 | ||||
|         if (args.length < 2) { | ||||
|             System.out.println("Input and output paths are necessary!"); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         // 需要指明hadoop用户名,否则在HDFS上创建目录时可能会抛出权限不足的异常 | ||||
|         System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME); | ||||
|  | ||||
|  | ||||
|         Configuration configuration = new Configuration(); | ||||
|         // 指明HDFS的地址 | ||||
|         configuration.set("fs.defaultFS", HDFS_URL); | ||||
|  | ||||
|  | ||||
|         // 创建一个Job | ||||
|         Job job = Job.getInstance(configuration); | ||||
|  | ||||
|         // 设置运行的主类 | ||||
|         job.setJarByClass(WordCountCombinerPartitionerApp.class); | ||||
|  | ||||
|         // 设置Mapper和Reducer | ||||
|         job.setMapperClass(WordCountMapper.class); | ||||
|         job.setReducerClass(WordCountReducer.class); | ||||
|  | ||||
|         // 设置Combiner | ||||
|         job.setCombinerClass(WordCountReducer.class); | ||||
|  | ||||
|  | ||||
|         // 设置自定义分区规则 | ||||
|         job.setPartitionerClass(CustomPartitioner.class); | ||||
|         // 设置reduce个数 | ||||
|         job.setNumReduceTasks(WordCountDataUtils.WORD_LIST.size()); | ||||
|  | ||||
|         // 设置Mapper输出key和value的类型 | ||||
|         job.setMapOutputKeyClass(Text.class); | ||||
|         job.setMapOutputValueClass(IntWritable.class); | ||||
|  | ||||
|         // 设置Reducer输出key和value的类型 | ||||
|         job.setOutputKeyClass(Text.class); | ||||
|         job.setOutputValueClass(IntWritable.class); | ||||
|  | ||||
|         // 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常 | ||||
|         FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME); | ||||
|         Path outputPath = new Path(args[1]); | ||||
|         if (fileSystem.exists(outputPath)) { | ||||
|             fileSystem.delete(outputPath, true); | ||||
|         } | ||||
|  | ||||
|  | ||||
|         // 设置作业输入文件和输出文件的路径 | ||||
|         FileInputFormat.setInputPaths(job, new Path(args[0])); | ||||
|         FileOutputFormat.setOutputPath(job, outputPath); | ||||
|  | ||||
|         // 将作业提交到群集并等待它完成,参数设置为true代表打印显示对应的进度 | ||||
|         boolean result = job.waitForCompletion(true); | ||||
|  | ||||
|         // 关闭之前创建的fileSystem | ||||
|         fileSystem.close(); | ||||
|  | ||||
|         // 根据作业结果,终止当前运行的Java虚拟机,退出程序 | ||||
|         System.exit(result ? 0 : -1); | ||||
|  | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,16 @@ | ||||
| package com.heibaiying.component; | ||||
|  | ||||
| import com.heibaiying.utils.WordCountDataUtils; | ||||
| import org.apache.hadoop.io.IntWritable; | ||||
| import org.apache.hadoop.io.Text; | ||||
| import org.apache.hadoop.mapreduce.Partitioner; | ||||
|  | ||||
| /** | ||||
|  * 自定义partitioner,按照单词分区 | ||||
|  */ | ||||
| public class CustomPartitioner extends Partitioner<Text, IntWritable> { | ||||
|  | ||||
|     public int getPartition(Text text, IntWritable intWritable, int numPartitions) { | ||||
|         return WordCountDataUtils.WORD_LIST.indexOf(text.toString()); | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,23 @@ | ||||
| package com.heibaiying.component; | ||||
|  | ||||
| import org.apache.hadoop.io.IntWritable; | ||||
| import org.apache.hadoop.io.LongWritable; | ||||
| import org.apache.hadoop.io.Text; | ||||
| import org.apache.hadoop.mapreduce.Mapper; | ||||
|  | ||||
| import java.io.IOException; | ||||
|  | ||||
| /** | ||||
|  * 将每行数据按照指定分隔符进行拆分 | ||||
|  */ | ||||
| public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { | ||||
|  | ||||
|     @Override | ||||
|     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { | ||||
|         String[] words = value.toString().split("\t"); | ||||
|         for (String word : words) { | ||||
|             context.write(new Text(word), new IntWritable(1)); | ||||
|         } | ||||
|     } | ||||
|  | ||||
| } | ||||
| @@ -0,0 +1,22 @@ | ||||
| package com.heibaiying.component; | ||||
|  | ||||
| import org.apache.hadoop.io.IntWritable; | ||||
| import org.apache.hadoop.io.Text; | ||||
| import org.apache.hadoop.mapreduce.Reducer; | ||||
|  | ||||
| import java.io.IOException; | ||||
|  | ||||
| /** | ||||
|  * 进行词频统计 | ||||
|  */ | ||||
| public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { | ||||
|  | ||||
|     @Override | ||||
|     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { | ||||
|         int count = 0; | ||||
|         for (IntWritable value : values) { | ||||
|             count += value.get(); | ||||
|         } | ||||
|         context.write(key, new IntWritable(count)); | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,91 @@ | ||||
| package com.heibaiying.utils; | ||||
|  | ||||
| import org.apache.commons.lang3.StringUtils; | ||||
| import org.apache.hadoop.conf.Configuration; | ||||
| import org.apache.hadoop.fs.FSDataOutputStream; | ||||
| import org.apache.hadoop.fs.FileSystem; | ||||
| import org.apache.hadoop.fs.Path; | ||||
|  | ||||
| import java.io.IOException; | ||||
| import java.net.URI; | ||||
| import java.nio.file.Files; | ||||
| import java.nio.file.Paths; | ||||
| import java.nio.file.StandardOpenOption; | ||||
| import java.util.Arrays; | ||||
| import java.util.Collections; | ||||
| import java.util.List; | ||||
| import java.util.Random; | ||||
|  | ||||
| /** | ||||
|  * 产生词频统计模拟数据 | ||||
|  */ | ||||
| public class WordCountDataUtils { | ||||
|  | ||||
|     public static final List<String> WORD_LIST = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); | ||||
|  | ||||
|  | ||||
|     /** | ||||
|      * 模拟产生词频数据 | ||||
|      * | ||||
|      * @return 词频数据 | ||||
|      */ | ||||
|     private static String generateData() { | ||||
|         StringBuilder builder = new StringBuilder(); | ||||
|         for (int i = 0; i < 1000; i++) { | ||||
|             Collections.shuffle(WORD_LIST); | ||||
|             Random random = new Random(); | ||||
|             int endIndex = random.nextInt(WORD_LIST.size()) % (WORD_LIST.size()) + 1; | ||||
|             String line = StringUtils.join(WORD_LIST.toArray(), "\t", 0, endIndex); | ||||
|             builder.append(line).append("\n"); | ||||
|         } | ||||
|         return builder.toString(); | ||||
|     } | ||||
|  | ||||
|  | ||||
|     /** | ||||
|      * 模拟产生词频数据并输出到本地 | ||||
|      * | ||||
|      * @param outputPath 输出文件路径 | ||||
|      */ | ||||
|     private static void generateDataToLocal(String outputPath) { | ||||
|         try { | ||||
|             java.nio.file.Path path = Paths.get(outputPath); | ||||
|             if (Files.exists(path)) { | ||||
|                 Files.delete(path); | ||||
|             } | ||||
|             Files.write(path, generateData().getBytes(), StandardOpenOption.CREATE); | ||||
|         } catch (IOException e) { | ||||
|             e.printStackTrace(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * 模拟产生词频数据并输出到HDFS | ||||
|      * | ||||
|      * @param hdfsUrl          HDFS地址 | ||||
|      * @param user             hadoop用户名 | ||||
|      * @param outputPathString 存储到HDFS上的路径 | ||||
|      */ | ||||
|     private static void generateDataToHDFS(String hdfsUrl, String user, String outputPathString) { | ||||
|         FileSystem fileSystem = null; | ||||
|         try { | ||||
|             fileSystem = FileSystem.get(new URI(hdfsUrl), new Configuration(), user); | ||||
|             Path outputPath = new Path(outputPathString); | ||||
|             if (fileSystem.exists(outputPath)) { | ||||
|                 fileSystem.delete(outputPath, true); | ||||
|             } | ||||
|             FSDataOutputStream out = fileSystem.create(outputPath); | ||||
|             out.write(generateData().getBytes()); | ||||
|             out.flush(); | ||||
|             out.close(); | ||||
|             fileSystem.close(); | ||||
|         } catch (Exception e) { | ||||
|             e.printStackTrace(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     public static void main(String[] args) { | ||||
|        //generateDataToLocal("input.txt"); | ||||
|        generateDataToHDFS("hdfs://192.168.0.107:8020", "root", "/wordcount/input.txt"); | ||||
|     } | ||||
| } | ||||
		Reference in New Issue
	
	Block a user