diff --git a/code/Hadoop/hadoop-word-count/pom.xml b/code/Hadoop/hadoop-word-count/pom.xml new file mode 100644 index 0000000..7d3d29b --- /dev/null +++ b/code/Hadoop/hadoop-word-count/pom.xml @@ -0,0 +1,55 @@ + + + 4.0.0 + + com.heibaiying + hadoop-word-count + 1.0 + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + + + UTF-8 + 2.6.0-cdh5.15.2 + + + + + + + cloudera + https://repository.cloudera.com/artifactory/cloudera-repos/ + + + + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + + org.apache.commons + commons-lang3 + 3.8.1 + + + + + \ No newline at end of file diff --git a/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/WordCountApp.java b/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/WordCountApp.java new file mode 100644 index 0000000..21a3ff7 --- /dev/null +++ b/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/WordCountApp.java @@ -0,0 +1,84 @@ +package com.heibaiying; + +import com.heibaiying.component.WordCountMapper; +import com.heibaiying.component.WordCountReducer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import java.net.URI; + +/** + * 组装作业 并提交到集群运行 + */ +public class WordCountApp { + + + // 这里为了直观显示参数 使用了硬编码,实际开发中可以通过外部传参 + private static final String HDFS_URL = "hdfs://192.168.0.107:8020"; + private static final String HADOOP_USER_NAME = "root"; + + public static void main(String[] args) throws Exception { + + + // 文件输入路径和输出路径由外部传参指定 + if (args.length < 2) { + System.out.println("Input and output paths are necessary!"); + return; + } + + // 需要指明hadoop用户名,否则在HDFS上创建目录时可能会抛出权限不足的异常 + System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME); + + + Configuration configuration = new Configuration(); + // 指明HDFS的地址 + configuration.set("fs.defaultFS", HDFS_URL); + + + // 创建一个Job + Job job = Job.getInstance(configuration); + + // 设置运行的主类 + job.setJarByClass(WordCountApp.class); + + // 设置Mapper和Reducer + job.setMapperClass(WordCountMapper.class); + job.setReducerClass(WordCountReducer.class); + + // 设置Mapper输出key和value的类型 + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + + // 设置Reducer输出key和value的类型 + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + // 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常 + FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME); + Path outputPath = new Path(args[1]); + if (fileSystem.exists(outputPath)) { + fileSystem.delete(outputPath, true); + } + + + // 设置作业输入文件和输出文件的路径 + FileInputFormat.setInputPaths(job, new Path(args[0])); + FileOutputFormat.setOutputPath(job, outputPath); + + // 将作业提交到群集并等待它完成,参数设置为true代表打印显示对应的进度 + boolean result = job.waitForCompletion(true); + + // 关闭之前创建的fileSystem + fileSystem.close(); + + // 根据作业结果,终止当前运行的Java虚拟机,退出程序 + System.exit(result ? 0 : -1); + + } +} diff --git a/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/WordCountCombinerApp.java b/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/WordCountCombinerApp.java new file mode 100644 index 0000000..8867812 --- /dev/null +++ b/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/WordCountCombinerApp.java @@ -0,0 +1,87 @@ +package com.heibaiying; + +import com.heibaiying.component.WordCountMapper; +import com.heibaiying.component.WordCountReducer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import java.net.URI; + +/** + * 组装作业 并提交到集群运行 + */ +public class WordCountCombinerApp { + + + // 这里为了直观显示参数 使用了硬编码的形式,实际开发中可以通过外部传参 + private static final String HDFS_URL = "hdfs://192.168.0.107:8020"; + private static final String HADOOP_USER_NAME = "root"; + + public static void main(String[] args) throws Exception { + + + // 文件输入路径和输出路径由外部传参指定 + if (args.length < 2) { + System.out.println("Input and output paths are necessary!"); + return; + } + + // 需要指明hadoop用户名,否则在HDFS上创建目录时可能会抛出权限不足的异常 + System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME); + + + Configuration configuration = new Configuration(); + // 指明HDFS的地址 + configuration.set("fs.defaultFS", HDFS_URL); + + + // 创建一个Job + Job job = Job.getInstance(configuration); + + // 设置运行的主类 + job.setJarByClass(WordCountCombinerApp.class); + + // 设置Mapper和Reducer + job.setMapperClass(WordCountMapper.class); + job.setReducerClass(WordCountReducer.class); + + // 设置Combiner + job.setCombinerClass(WordCountReducer.class); + + // 设置Mapper输出key和value的类型 + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + + // 设置Reducer输出key和value的类型 + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + // 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常 + FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME); + Path outputPath = new Path(args[1]); + if (fileSystem.exists(outputPath)) { + fileSystem.delete(outputPath, true); + } + + + // 设置作业输入文件和输出文件的路径 + FileInputFormat.setInputPaths(job, new Path(args[0])); + FileOutputFormat.setOutputPath(job, outputPath); + + // 将作业提交到群集并等待它完成,参数设置为true代表打印显示对应的进度 + boolean result = job.waitForCompletion(true); + + // 关闭之前创建的fileSystem + fileSystem.close(); + + // 根据作业结果,终止当前运行的Java虚拟机,退出程序 + System.exit(result ? 0 : -1); + + } +} diff --git a/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/WordCountCombinerPartitionerApp.java b/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/WordCountCombinerPartitionerApp.java new file mode 100644 index 0000000..a29d78d --- /dev/null +++ b/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/WordCountCombinerPartitionerApp.java @@ -0,0 +1,95 @@ +package com.heibaiying; + +import com.heibaiying.component.CustomPartitioner; +import com.heibaiying.component.WordCountMapper; +import com.heibaiying.component.WordCountReducer; +import com.heibaiying.utils.WordCountDataUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import java.net.URI; + +/** + * 组装作业 并提交到集群运行 + */ +public class WordCountCombinerPartitionerApp { + + + // 这里为了直观显示参数 使用了硬编码的形式,实际开发中可以通过外部传参 + private static final String HDFS_URL = "hdfs://192.168.0.107:8020"; + private static final String HADOOP_USER_NAME = "root"; + + public static void main(String[] args) throws Exception { + + + // 文件输入路径和输出路径由外部传参指定 + if (args.length < 2) { + System.out.println("Input and output paths are necessary!"); + return; + } + + // 需要指明hadoop用户名,否则在HDFS上创建目录时可能会抛出权限不足的异常 + System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME); + + + Configuration configuration = new Configuration(); + // 指明HDFS的地址 + configuration.set("fs.defaultFS", HDFS_URL); + + + // 创建一个Job + Job job = Job.getInstance(configuration); + + // 设置运行的主类 + job.setJarByClass(WordCountCombinerPartitionerApp.class); + + // 设置Mapper和Reducer + job.setMapperClass(WordCountMapper.class); + job.setReducerClass(WordCountReducer.class); + + // 设置Combiner + job.setCombinerClass(WordCountReducer.class); + + + // 设置自定义分区规则 + job.setPartitionerClass(CustomPartitioner.class); + // 设置reduce个数 + job.setNumReduceTasks(WordCountDataUtils.WORD_LIST.size()); + + // 设置Mapper输出key和value的类型 + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + + // 设置Reducer输出key和value的类型 + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + // 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常 + FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME); + Path outputPath = new Path(args[1]); + if (fileSystem.exists(outputPath)) { + fileSystem.delete(outputPath, true); + } + + + // 设置作业输入文件和输出文件的路径 + FileInputFormat.setInputPaths(job, new Path(args[0])); + FileOutputFormat.setOutputPath(job, outputPath); + + // 将作业提交到群集并等待它完成,参数设置为true代表打印显示对应的进度 + boolean result = job.waitForCompletion(true); + + // 关闭之前创建的fileSystem + fileSystem.close(); + + // 根据作业结果,终止当前运行的Java虚拟机,退出程序 + System.exit(result ? 0 : -1); + + } +} diff --git a/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/component/CustomPartitioner.java b/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/component/CustomPartitioner.java new file mode 100644 index 0000000..8cf50ed --- /dev/null +++ b/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/component/CustomPartitioner.java @@ -0,0 +1,16 @@ +package com.heibaiying.component; + +import com.heibaiying.utils.WordCountDataUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Partitioner; + +/** + * 自定义partitioner,按照单词分区 + */ +public class CustomPartitioner extends Partitioner { + + public int getPartition(Text text, IntWritable intWritable, int numPartitions) { + return WordCountDataUtils.WORD_LIST.indexOf(text.toString()); + } +} diff --git a/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/component/WordCountMapper.java b/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/component/WordCountMapper.java new file mode 100644 index 0000000..26b6451 --- /dev/null +++ b/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/component/WordCountMapper.java @@ -0,0 +1,23 @@ +package com.heibaiying.component; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +import java.io.IOException; + +/** + * 将每行数据按照指定分隔符进行拆分 + */ +public class WordCountMapper extends Mapper { + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String[] words = value.toString().split("\t"); + for (String word : words) { + context.write(new Text(word), new IntWritable(1)); + } + } + +} diff --git a/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/component/WordCountReducer.java b/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/component/WordCountReducer.java new file mode 100644 index 0000000..be96e23 --- /dev/null +++ b/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/component/WordCountReducer.java @@ -0,0 +1,22 @@ +package com.heibaiying.component; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +/** + * 进行词频统计 + */ +public class WordCountReducer extends Reducer { + + @Override + protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + int count = 0; + for (IntWritable value : values) { + count += value.get(); + } + context.write(key, new IntWritable(count)); + } +} diff --git a/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/utils/WordCountDataUtils.java b/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/utils/WordCountDataUtils.java new file mode 100644 index 0000000..de04ccc --- /dev/null +++ b/code/Hadoop/hadoop-word-count/src/main/java/com/heibaiying/utils/WordCountDataUtils.java @@ -0,0 +1,91 @@ +package com.heibaiying.utils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +/** + * 产生词频统计模拟数据 + */ +public class WordCountDataUtils { + + public static final List WORD_LIST = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); + + + /** + * 模拟产生词频数据 + * + * @return 词频数据 + */ + private static String generateData() { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < 1000; i++) { + Collections.shuffle(WORD_LIST); + Random random = new Random(); + int endIndex = random.nextInt(WORD_LIST.size()) % (WORD_LIST.size()) + 1; + String line = StringUtils.join(WORD_LIST.toArray(), "\t", 0, endIndex); + builder.append(line).append("\n"); + } + return builder.toString(); + } + + + /** + * 模拟产生词频数据并输出到本地 + * + * @param outputPath 输出文件路径 + */ + private static void generateDataToLocal(String outputPath) { + try { + java.nio.file.Path path = Paths.get(outputPath); + if (Files.exists(path)) { + Files.delete(path); + } + Files.write(path, generateData().getBytes(), StandardOpenOption.CREATE); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 模拟产生词频数据并输出到HDFS + * + * @param hdfsUrl HDFS地址 + * @param user hadoop用户名 + * @param outputPathString 存储到HDFS上的路径 + */ + private static void generateDataToHDFS(String hdfsUrl, String user, String outputPathString) { + FileSystem fileSystem = null; + try { + fileSystem = FileSystem.get(new URI(hdfsUrl), new Configuration(), user); + Path outputPath = new Path(outputPathString); + if (fileSystem.exists(outputPath)) { + fileSystem.delete(outputPath, true); + } + FSDataOutputStream out = fileSystem.create(outputPath); + out.write(generateData().getBytes()); + out.flush(); + out.close(); + fileSystem.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void main(String[] args) { + //generateDataToLocal("input.txt"); + generateDataToHDFS("hdfs://192.168.0.107:8020", "root", "/wordcount/input.txt"); + } +} diff --git a/code/Hadoop/hadoop-word-count/src/main/resources/log4j.properties b/code/Hadoop/hadoop-word-count/src/main/resources/log4j.properties new file mode 100644 index 0000000..4525fba --- /dev/null +++ b/code/Hadoop/hadoop-word-count/src/main/resources/log4j.properties @@ -0,0 +1,9 @@ +log4j.rootLogger=INFO,CONSOLE +log4j.addivity.org.apache=false + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.Threshold=INFO +log4j.appender.CONSOLE.layout.ConversionPattern=%d{yyyy-MM-dd HH\:mm\:ss} -%-4r [%t] %-5p %x - %m%n +log4j.appender.CONSOLE.Target=System.out +log4j.appender.CONSOLE.Encoding=UTF-8 +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout diff --git a/code/Hadoop/hdfs-java-api/pom.xml b/code/Hadoop/hdfs-java-api/pom.xml new file mode 100644 index 0000000..e04cfbd --- /dev/null +++ b/code/Hadoop/hdfs-java-api/pom.xml @@ -0,0 +1,42 @@ + + + 4.0.0 + + com.heibaiying + hdfs-java-api + 1.0 + + + + UTF-8 + 2.6.0-cdh5.15.2 + + + + + + + cloudera + https://repository.cloudera.com/artifactory/cloudera-repos/ + + + + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + junit + junit + 4.12 + test + + + + \ No newline at end of file diff --git a/code/Hadoop/hdfs-java-api/src/main/java/com/heibaiying/utils/HdfsUtils.java b/code/Hadoop/hdfs-java-api/src/main/java/com/heibaiying/utils/HdfsUtils.java new file mode 100644 index 0000000..16f9fea --- /dev/null +++ b/code/Hadoop/hdfs-java-api/src/main/java/com/heibaiying/utils/HdfsUtils.java @@ -0,0 +1,180 @@ +package com.heibaiying.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URISyntaxException; + +/** + * HDFS 工具类 + */ +public class HdfsUtils { + + + private static final String HDFS_PATH = "hdfs://192.168.0.107:8020"; + private static final String HDFS_USER = "root"; + private static FileSystem fileSystem; + + static { + + try { + Configuration configuration = new Configuration(); + configuration.set("dfs.replication", "1"); + fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + } + + + public static FileSystem getFileSystem() { + return fileSystem; + } + + /** + * 创建目录 支持递归创建 + * + * @param path 路径地址 + * @return 创建是否成功 + */ + public static boolean mkdir(String path) throws Exception { + return fileSystem.mkdirs(new Path(path)); + } + + /** + * 查看文件内容 + * + * @param path 路径地址 + * @return 返回文件内容字符串 + */ + public static String text(String path, String encode) throws Exception { + FSDataInputStream inputStream = fileSystem.open(new Path(path)); + return inputStreamToString(inputStream, encode); + } + + + /** + * 创建文件并写入内容 + * + * @param path 路径地址 + * @param context 文件内容 + */ + public void createAndWrite(String path, String context) throws Exception { + FSDataOutputStream out = fileSystem.create(new Path(path)); + out.writeUTF(context); + out.flush(); + out.close(); + } + + /** + * 文件重命名 + * + * @param oldPath 旧文件路径 + * @param newPath 新文件路径 + * @return 重命名是否成功 + */ + public boolean rename(String oldPath, String newPath) throws Exception { + return fileSystem.rename(new Path(oldPath), new Path(newPath)); + + } + + + /** + * 拷贝文件到HDFS + * + * @param localPath 本地文件路径 + * @param hdfsPath 存储到hdfs上的路径 + */ + public void copyFromLocalFile(String localPath, String hdfsPath) throws Exception { + fileSystem.copyFromLocalFile(new Path(localPath), new Path(hdfsPath)); + } + + + /** + * 从HDFS下载文件 + * + * @param hdfsPath 文件在hdfs上的路径 + * @param localPath 存储到本地的路径 + */ + public void copyToLocalFile(String hdfsPath, String localPath) throws Exception { + fileSystem.copyToLocalFile(new Path(hdfsPath), new Path(localPath)); + } + + + /** + * 查询给定路径中文件/目录的状态 + * + * @param path 目录路径 + * @return 文件信息的数组 + */ + public FileStatus[] listFiles(String path) throws Exception { + return fileSystem.listStatus(new Path(path)); + } + + + /** + * 查询给定路径中文件的状态和块位置 + * + * @param path 路径可以是目录路径也可以是文件路径 + * @return 文件信息的数组 + */ + public RemoteIterator listFilesRecursive(String path, boolean recursive) throws Exception { + return fileSystem.listFiles(new Path(path), recursive); + } + + + /** + * 查看文件块信息 + * + * @param path 文件路径 + * @return 块信息数组 + */ + public BlockLocation[] getFileBlockLocations(String path) throws Exception { + FileStatus fileStatus = fileSystem.getFileStatus(new Path(path)); + return fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + } + + /** + * 删除文件 + * + * @param path 文件路径 + * @return 删除是否成功 + */ + public boolean delete(String path) throws Exception { + return fileSystem.delete(new Path(path), true); + } + + + /** + * 把输入流转换为指定字符 + * + * @param inputStream 输入流 + * @param encode 指定编码类型 + */ + private static String inputStreamToString(InputStream inputStream, String encode) { + try { + if (encode == null || ("".equals(encode))) { + encode = "utf-8"; + } + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encode)); + StringBuilder builder = new StringBuilder(); + String str = ""; + while ((str = reader.readLine()) != null) { + builder.append(str).append("\n"); + } + return builder.toString(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } +} diff --git a/pictures/hadoop-combiner.png b/pictures/hadoop-combiner.png new file mode 100644 index 0000000..e75c8b3 Binary files /dev/null and b/pictures/hadoop-combiner.png differ diff --git a/pictures/hadoop-no-combiner.png b/pictures/hadoop-no-combiner.png new file mode 100644 index 0000000..b946a8b Binary files /dev/null and b/pictures/hadoop-no-combiner.png differ diff --git a/pictures/hadoop-wordcountapp.png b/pictures/hadoop-wordcountapp.png new file mode 100644 index 0000000..3bf6b95 Binary files /dev/null and b/pictures/hadoop-wordcountapp.png differ diff --git a/pictures/hadoop-wordcountcombinerpartition.png b/pictures/hadoop-wordcountcombinerpartition.png new file mode 100644 index 0000000..5315641 Binary files /dev/null and b/pictures/hadoop-wordcountcombinerpartition.png differ