hdfs 客户端java 代码
发布日期:2021-06-28 21:03:40 浏览次数:3 分类:技术文章

本文共 8666 字,大约阅读时间需要 28 分钟。

4.0.0
cn.itcast
day04_hdfs_api_demo
1.0-SNAPSHOT
org.apache.hadoop
hadoop-common
2.7.2
org.apache.hadoop
hadoop-client
2.7.2
org.apache.hadoop
hadoop-hdfs
2.7.2
org.apache.hadoop
hadoop-mapreduce-client-core
2.7.2
junit
junit
RELEASE
org.apache.maven.plugins
maven-compiler-plugin
3.1
1.8
1.8
UTF-8
org.apache.maven.plugins
maven-shade-plugin
2.4.3
package
shade
true

hdfs 客户端java 代码

package cn.itcast.hdfs_api;import org.apache.commons.io.IOUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*;import org.junit.Before;import org.junit.Test;import org.junit.experimental.theories.suppliers.TestedOn;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.io.InputStream;import java.net.MalformedURLException;import java.net.URI;import java.net.URISyntaxException;import java.net.URL;public class HdfsApiDemo {
FileSystem fileSystem = null; Configuration configuration = null; /** * HDFS的路径,core-site.xml中配置的端口号 */ public static final String HDFS_PATH = "hdfs://10.139.12.149:9000"; /** * 解决无权限访问,设置远程hadoop的linux用户名称 */ public static final String USER = "root"; /** * 单元测试之前的准备工作,准备环境,加载配置 * * @throws Exception */ @Before public void setUp() throws Exception {
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.2"); System.out.println("HDFSApp is setUp....."); configuration = new Configuration(); fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, USER); //1:获取FileSystem(分布式文件系统) // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:8020"), new Configuration(),"root"); } /* 小文件的合并 */ @Test public void mergeFile() throws URISyntaxException, IOException, InterruptedException {
//1:获取FileSystem(分布式文件系统) // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:8020"), new Configuration(),"root"); //2:获取hdfs大文件的输出流 FSDataOutputStream outputStream = fileSystem.create(new Path("/big_txt.txt")); //3:获取一个本地文件系统 LocalFileSystem localFileSystem = FileSystem.getLocal(new Configuration()); //4:获取本地文件夹下所有文件的详情 FileStatus[] fileStatuses = localFileSystem.listStatus(new Path("D:\\input")); //5:遍历每个文件,获取每个文件的输入流 for (FileStatus fileStatus : fileStatuses) {
FSDataInputStream inputStream = localFileSystem.open(fileStatus.getPath()); //6:将小文件的数据复制到大文件 IOUtils.copy(inputStream, outputStream); IOUtils.closeQuietly(inputStream); } //7:关闭流 IOUtils.closeQuietly(outputStream); localFileSystem.close(); fileSystem.close(); } /* 文件的上传 */ @Test public void uploadFile() throws URISyntaxException, IOException {
//1:获取FileSystem // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:8020"), new Configuration()); //2:调用方法,实现上传 fileSystem.copyFromLocalFile(new Path("E:\\apps\\hdfs\\hello.txt"), new Path("/user/ysw/input3/hello2.txt")); //3:关闭FileSystem fileSystem.close(); } /* 实现文件的下载:方式2 */ @Test public void downloadFile2() throws URISyntaxException, IOException, InterruptedException {
//1:获取FileSystem // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:8020"), new Configuration(),"root"); //2:调用方法,实现文件的下载 fileSystem.copyToLocalFile(new Path("/user/ysw/input3/hello2.txt"), new Path("E:\\apps\\hdfs\\hello1.txt")); //3:关闭FileSystem fileSystem.close(); } /* 实现文件的下载 */ @Test public void downloadFile() throws URISyntaxException, IOException {
//1:获取FileSystem // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:8020"), new Configuration()); //2:获取hdfs的输入流 FSDataInputStream inputStream = fileSystem.open(new Path("/a.txt")); //3:获取本地路径的输出流 FileOutputStream outputStream = new FileOutputStream("D://a.txt"); //4:文件的拷贝 IOUtils.copy(inputStream, outputStream); //5:关闭流 IOUtils.closeQuietly(inputStream); IOUtils.closeQuietly(outputStream); fileSystem.close(); } /* hdfs创建文件夹 */ @Test public void mkdirsTest() throws URISyntaxException, IOException {
//1:获取FileSystem实例 // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:9000"), new Configuration()); //2:创建文件夹 //boolean bl = fileSystem.mkdirs(new Path("/aaa/bbb/ccc")); fileSystem.create(new Path("/aaa/bbb/ccc/a.txt")); fileSystem.create(new Path("/aaa2/bbb/ccc/a.txt")); //System.out.println(bl); //3: 关闭FileSystem //fileSystem.close(); } /* hdfs文件的遍历 */ @Test public void listFiles() throws URISyntaxException, IOException {
//1:获取FileSystem实例 // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:9000"), new Configuration()); //2:调用方法listFiles 获取 /目录下所有的文件信息 RemoteIterator
iterator = fileSystem.listFiles(new Path("/"), true); //3:遍历迭代器 while (iterator.hasNext()){
LocatedFileStatus fileStatus = iterator.next(); //获取文件的绝对路径 : hdfs://kafka1:8020/xxx System.out.println(fileStatus.getPath() + "----" +fileStatus.getPath().getName()); //文件的block信息 BlockLocation[] blockLocations = fileStatus.getBlockLocations(); System.out.println("block数:"+blockLocations.length); } } /* 获取FileSystem;方式4 */ @Test public void getFileSystem4() throws URISyntaxException, IOException {
// FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://kafka1:9000"), new Configuration()); System.out.println(fileSystem); } /* 获取FileSystem;方式3 */ @Test public void getFileSystem3() throws IOException {
// Configuration configuration = new Configuration(); //指定文件系统类型 // configuration.set("fs.defaultFS", "hdfs://kafka1:8020"); //获取指定的文件系统 FileSystem fileSystem = FileSystem.newInstance(configuration); System.out.println(fileSystem); } /* 获取FileSystem;方式2 */ @Test public void getFileSystem2() throws URISyntaxException, IOException {
// FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:8020"), new Configuration()); System.out.println(fileSystem); } /* 获取FileSystem;方式1 */ @Test public void getFileSystem1() throws IOException {
//1:创建Configuration对象 Configuration configuration = new Configuration(); //2:设置文件系统的类型 configuration.set("fs.defaultFS", "hdfs://kafka1:9000"); //3:获取指定的文件系统 FileSystem fileSystem = FileSystem.get(configuration); //4:输出 System.out.println(fileSystem); } @Test public void urlHdfs() throws IOException {
//1:注册url URL.setURLStreamHandlerFactory( new FsUrlStreamHandlerFactory()); //2:获取hdfs文件的输入流 InputStream inputStream = new URL("hdfs://kafka1:9000/a.txt").openStream(); //3:获取本地文件的输出流 FileOutputStream outputStream = new FileOutputStream(new File("D:\\hello2.txt")); //4:实现文件的拷贝 IOUtils.copy(inputStream, outputStream); //5:关流 IOUtils.closeQuietly(inputStream); IOUtils.closeQuietly(outputStream); }}

转载地址:https://blog.csdn.net/yangshengwei230612/article/details/117929940 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Spark性能调优实战--精华总结-极客时间 吴磊
下一篇:spark 安装

发表评论

最新留言

很好
[***.229.124.182]2024年04月19日 02时10分02秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章