使用HDFS API实现hadoop HDFS文件系统的基本操作
发布日期:2021-06-30 17:51:08 浏览次数:3 分类:技术文章

本文共 5093 字,大约阅读时间需要 16 分钟。

下面介绍使用hadoop的HDFS分布式文件系统的java API实现基本的文件操作,比如:创建文件、修给文件、创建目录或者文件夹、从本地系统上传文件到HDFS系统中、从HDFS文件系统中下载文件到本地系统。

集成开发环境IDE工具:eclispe(已经安装了hadoop插件)

说明:下面main函数中的路径:

/user/liangyihuai  以及/usr/是在HDFS上面的路径,并不是本地的

/usr/local/hadoop/README.txt是本地目录

对于hadoop配置文件core-site.xml中的内容是:

<configuration>

    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
<configuration>

package com.huai.copy;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;public class MyHDFS {	private Configuration conf = null;	public MyHDFS() {		this.conf = new Configuration(false);		conf.addResource(new Path("/usr/local/hadoop/etc/hadoop/core-site.xml"));	}	public void upload(String src, String dst) throws IOException {		Path srcPath = new Path(src);		Path dstPath = new Path(dst);				FileSystem hdfs = dstPath.getFileSystem(conf);		hdfs.mkdirs(dstPath);		hdfs.copyFromLocalFile(false, srcPath, dstPath);		System.out.println("MyHdfs, copy file ok");		FileStatus[] filsStatuses = hdfs.listStatus(dstPath);		for (FileStatus status : filsStatuses) {			System.out.println(status);		}	}		public void uploadFileToHDFS(String src, String dst) throws IOException{		Path srcPath = new Path(src);		Path dstPath = new Path(dst);					FileSystem localfs = FileSystem.getLocal(conf);		FileSystem hdfs = FileSystem.get(conf);				FSDataInputStream in = localfs.open(srcPath);		FSDataOutputStream out = hdfs.create(dstPath);				byte[] b = new byte[1024];		int len = 0;		while((len = in.read(b)) != -1){			out.write(b, 0, len);		}		in.close();		out.close();	}	public void readFileInHDFS(String src) throws IOException {		Path srcPath = new Path(src);				FileSystem srcFS = FileSystem.get(this.conf);				FSDataInputStream inputStream = srcFS.open(srcPath);				byte[] b = new byte[1024];		while(inputStream.read(b) != -1){			System.out.print(new String(b));		}				inputStream.close();	}		public void writeInHDFS(String path, String content) throws IOException{		Path dfsPath = new Path(path);				FileSystem hdfs = FileSystem.get(conf);				byte[] b = content.getBytes();				FSDataOutputStream outputStream = hdfs.create(dfsPath, true, b.length);				outputStream.write(b, 0, b.length);				outputStream.close();	}		public void download1(String src, String dst) throws IOException{		Path srcPath = new Path(src);		Path dstPath = new Path(dst);				FileSystem hdfs = FileSystem.get(conf);				hdfs.copyToLocalFile(srcPath, dstPath);	}		public void download(String src, String dst) throws IOException{		Path srcPath = new Path(src);		Path dstPath = new Path(dst);				FileSystem hdfs = FileSystem.get(conf);		FileSystem localfs = FileSystem.get(conf);				FSDataInputStream inputStream = hdfs.open(srcPath);		FSDataOutputStream outputStream = localfs.create(dstPath);				byte[] b = new byte[1024];		int len = 0;		while((len = inputStream.read(b)) != -1){			outputStream.write(b, 0, len);		}		inputStream.close();		outputStream.close();	}		public void createFileInHDFS(String path) throws IOException{		FileSystem hdfs = FileSystem.get(conf);		byte[] buff = "hello hadoop world !".getBytes();				Path dfsPath = new Path(path);				FSDataOutputStream outputStream = hdfs.create(dfsPath);				outputStream.write(buff,0, buff.length);				System.out.println("create file finished");				outputStream.close();	}	public void makeDirInHDFS(String path) throws IOException{		FileSystem hdfs = FileSystem.get(conf);				Path dfsPath = new Path(path);				hdfs.mkdirs(dfsPath);				System.out.println("make dir finished ");	}		public void renameInHDFS(String oldNamePath, String newNamePath) throws IOException{		FileSystem hdfs = FileSystem.get(conf);				Path oldPath = new Path(oldNamePath);		Path newPath = new Path(newNamePath);				boolean isRename = hdfs.rename(oldPath, newPath);				String result = isRename?"yes":"no";		System.out.println(result);	}		public void deleteFileInHDFS(String path) throws IOException{		Path dfsPath = new Path(path);		FileSystem hdfs = FileSystem.get(conf);				boolean isDeleted = hdfs.delete(dfsPath, true);				String result = isDeleted?"yes":"no";				System.out.println(result);	}		public static void main(String[] args) throws IOException {		String src = "/usr/local/hadoop/README.txt";		String dst = "/user/liangyihuai/";		MyHDFS t = new MyHDFS();//		t.upload(src, dst);				//读取HDFS上面的文件/或者下载//		t.readFileInHDFS("/helloHadoop.txt");				//创建HDFS文件//		t.createFileInHDFS("/helloHadoop.txt");				//在HDFS中创建目录//		t.makeDirInHDFS("/user/liangyihuai/");				//在HDFS中重命名文件或者文件夹//		t.renameInHDFS("/user/liangyihuai", "/user/huai");//		t.renameInHDFS("/README.txt", "/readme.txt");				//在HDFS中删除文件//		t.deleteFileInHDFS("/readme.txt");				//在HDFS中写文件//		t.writeInHDFS("/helloHadoop.txt", "hello\nthis is \nliang \nyi \nhuai");				//下载		t.download1("/helloHadoop.txt", "/home/liangyihuai/hellohadoop.txt");				//上传文件到hdfs中//		t.uploadFileToHDFS("/home/liangyihuai/input.txt", "/input.txt");	}}

转载地址:https://liangyihuai.blog.csdn.net/article/details/49228383 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:在ubuntu系统中使用dpkg命令安装后缀名为deb的软件包
下一篇:eclipse 操作HDFS时出现Permission denied的三个解决方法

发表评论

最新留言

哈哈,博客排版真的漂亮呢~
[***.90.31.176]2024年04月21日 15时53分47秒