From 14e6e903167efbd92ac52d1e64446635f3d41355 Mon Sep 17 00:00:00 2001 From: luoxiang <2806718453@qq.com> Date: Wed, 1 May 2019 11:28:28 +0800 Subject: [PATCH] HDFS java api --- README.md | 2 +- .../java/com/heibaiying/utils/HdfsUtils.java | 2 +- .../src/main/resources/log4j.properties | 9 + .../hdfs-java-api/src/test/java/HdfsTest.java | 248 ++++++++++++++++++ 4 files changed, 259 insertions(+), 2 deletions(-) create mode 100644 code/Hadoop/hdfs-java-api/src/main/resources/log4j.properties create mode 100644 code/Hadoop/hdfs-java-api/src/test/java/HdfsTest.java diff --git a/README.md b/README.md index 72d112f..92f4a9f 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,7 @@ 5. [Hive 核心概念讲解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hive核心概念讲解.md) 6. [Hive 常用DDL操作](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hive常用DDL操作.md) 7. [Hive 分区表和分桶表](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hive分区表和分桶表.md) -8. [Hive 视图和索引](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hive数据查询详解.md) +8. [Hive 视图和索引](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hive视图和索引.md) 9. [Hive常用DML操作](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hive常用DML操作.md) 10. [Hive 数据查询详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hive数据查询详解.md) diff --git a/code/Hadoop/hdfs-java-api/src/main/java/com/heibaiying/utils/HdfsUtils.java b/code/Hadoop/hdfs-java-api/src/main/java/com/heibaiying/utils/HdfsUtils.java index 16f9fea..56205a1 100644 --- a/code/Hadoop/hdfs-java-api/src/main/java/com/heibaiying/utils/HdfsUtils.java +++ b/code/Hadoop/hdfs-java-api/src/main/java/com/heibaiying/utils/HdfsUtils.java @@ -70,7 +70,7 @@ public class HdfsUtils { */ public void createAndWrite(String path, String context) throws Exception { FSDataOutputStream out = fileSystem.create(new Path(path)); - out.writeUTF(context); + out.write(context.getBytes()); out.flush(); out.close(); } diff --git a/code/Hadoop/hdfs-java-api/src/main/resources/log4j.properties b/code/Hadoop/hdfs-java-api/src/main/resources/log4j.properties new file mode 100644 index 0000000..4525fba --- /dev/null +++ b/code/Hadoop/hdfs-java-api/src/main/resources/log4j.properties @@ -0,0 +1,9 @@ +log4j.rootLogger=INFO,CONSOLE +log4j.addivity.org.apache=false + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.Threshold=INFO +log4j.appender.CONSOLE.layout.ConversionPattern=%d{yyyy-MM-dd HH\:mm\:ss} -%-4r [%t] %-5p %x - %m%n +log4j.appender.CONSOLE.Target=System.out +log4j.appender.CONSOLE.Encoding=UTF-8 +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout diff --git a/code/Hadoop/hdfs-java-api/src/test/java/HdfsTest.java b/code/Hadoop/hdfs-java-api/src/test/java/HdfsTest.java new file mode 100644 index 0000000..197c3c2 --- /dev/null +++ b/code/Hadoop/hdfs-java-api/src/test/java/HdfsTest.java @@ -0,0 +1,248 @@ +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.Progressable; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.*; +import java.net.URI; +import java.net.URISyntaxException; + +/** + * HDFS常用API + */ +public class HdfsTest { + + private static final String HDFS_PATH = "hdfs://192.168.0.106:8020"; + private static final String HDFS_USER = "root"; + private static FileSystem fileSystem; + + + /** + * 获取fileSystem + */ + @Before + public void prepare() { + try { + Configuration configuration = new Configuration(); + configuration.set("dfs.replication", "1"); + fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + } + + + /** + * 创建目录,支持递创建 + */ + @Test + public void mkDir() throws Exception { + fileSystem.mkdirs(new Path("/hdfs-api/test0/")); + } + + + /** + * 创建具有指定权限的目录 + */ + @Test + public void mkDirWithPermission() throws Exception { + fileSystem.mkdirs(new Path("/hdfs-api/test1/"), + new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.READ)); + } + + /** + * 创建文件,并写入内容 + */ + @Test + public void create() throws Exception { + // 如果文件存在,默认会覆盖, 可以通过第二个参数进行控制。第三个参数可以控制使用缓冲区的大小 + FSDataOutputStream out = fileSystem.create(new Path("/hdfs-api/test/a.txt"), + true, 4096); + out.write("hello hadoop!".getBytes()); + out.write("hello spark!".getBytes()); + out.write("hello flink!".getBytes()); + // 强制将缓冲区中内容刷出 + out.flush(); + out.close(); + } + + + /** + * 判断文件是否存在 + */ + @Test + public void exist() throws Exception { + boolean exists = fileSystem.exists(new Path("/hdfs-api/test/a.txt")); + System.out.println(exists); + } + + + /** + * 查看文件内容 + */ + @Test + public void readToString() throws Exception { + FSDataInputStream inputStream = fileSystem.open(new Path("/hdfs-api/test/a.txt")); + String context = inputStreamToString(inputStream, "utf-8"); + System.out.println(context); + } + + + /** + * 文件重命名 + */ + @Test + public void rename() throws Exception { + Path oldPath = new Path("/hdfs-api/test/a.txt"); + Path newPath = new Path("/hdfs-api/test/b.txt"); + boolean result = fileSystem.rename(oldPath, newPath); + System.out.println(result); + } + + + /** + * 删除文件 + */ + @Test + public void delete() throws Exception { + //第二个参数代表是否递归参数,如果path是一个目录且设置了递归删除, 删除该目录,否则抛出异常。 + boolean result = fileSystem.delete(new Path("/hdfs-api/test/b.txt"), true); + System.out.println(result); + } + + + /** + * 上传文件到HDFS + */ + @Test + public void copyFromLocalFile() throws Exception { + // 如果指定的是目录,则会把目录及其中的文件都复制到指定目录下 + Path src = new Path("D:\\BigData-Notes\\notes\\installation"); + Path dst = new Path("/hdfs-api/test/"); + fileSystem.copyFromLocalFile(src, dst); + } + + /** + * 上传文件到HDFS + */ + @Test + public void copyFromLocalBigFile() throws Exception { + + File file = new File("D:\\kafka.tgz"); + final float fileSize = file.length(); + InputStream in = new BufferedInputStream(new FileInputStream(file)); + + FSDataOutputStream out = fileSystem.create(new Path("/hdfs-api/test/kafka5.tgz"), + new Progressable() { + long fileCount = 0; + + public void progress() { + fileCount++; + // progress方法每上传大约64KB的数据后就会被调用一次 + System.out.println("文件上传总进度:" + (fileCount * 64 * 1024 / fileSize) * 100 + " %"); + } + }); + + IOUtils.copyBytes(in, out, 4096); + + } + + /** + * 从HDFS上下载文件 + */ + @Test + public void copyToLocalFile() throws Exception { + Path src = new Path("/hdfs-api/test/kafka.tgz"); + Path dst = new Path("D:\\app\\"); + /* + * 第一个参数控制下载完成后是否删除源文件,默认是true,即删除; + * 最后一个参数表示是否将RawLocalFileSystem用作本地文件系统; + * RawLocalFileSystem默认为false,通常情况下可以不设置, + * 但如果你在执行时候抛出NullPointerException异常,则代表你的文件系统与程序可能存在不兼容的情况(window下常见), + * 此时可以将RawLocalFileSystem设置为true + */ + fileSystem.copyToLocalFile(false, src, dst, true); + } + + + /** + * 查看目标文件夹下的所有文件信息 + */ + @Test + public void listFiles() throws Exception { + FileStatus[] statuses = fileSystem.listStatus(new Path("/hdfs-api")); + for (FileStatus fileStatus : statuses) { + //fileStatus的toString方法被重写过,直接打印可以看到所有信息 + System.out.println(fileStatus.toString()); + } + } + + + /** + * 递归查看目标文件夹下的所有文件 + */ + @Test + public void listFilesRecursive() throws Exception { + RemoteIterator files = fileSystem.listFiles(new Path("/hbase"), true); + while (files.hasNext()) { + System.out.println(files.next()); + } + } + + + /** + * 查看文件块信息 + */ + @Test + public void getFileBlockLocations() throws Exception { + + FileStatus fileStatus = fileSystem.getFileStatus(new Path("/hdfs-api/test/kafka.tgz")); + BlockLocation[] blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + for (BlockLocation block : blocks) { + System.out.println(block); + } + } + + + /** + * 测试结束后,释放fileSystem + */ + @After + public void destroy() { + fileSystem = null; + } + + + /** + * 把输入流转换为指定字符 + * + * @param inputStream 输入流 + * @param encode 指定编码类型 + */ + private static String inputStreamToString(InputStream inputStream, String encode) { + try { + if (encode == null || ("".equals(encode))) { + encode = "utf-8"; + } + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encode)); + StringBuilder builder = new StringBuilder(); + String str = ""; + while ((str = reader.readLine()) != null) { + builder.append(str).append("\n"); + } + return builder.toString(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } +}