hdfs 客户端java 代码
发布日期:2021-06-28 21:03:40
浏览次数:3
分类:技术文章
本文共 8666 字,大约阅读时间需要 28 分钟。
4.0.0 cn.itcast day04_hdfs_api_demo 1.0-SNAPSHOT org.apache.hadoop hadoop-common 2.7.2 org.apache.hadoop hadoop-client 2.7.2 org.apache.hadoop hadoop-hdfs 2.7.2 org.apache.hadoop hadoop-mapreduce-client-core 2.7.2 junit junit RELEASE org.apache.maven.plugins maven-compiler-plugin 3.1 org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade true
hdfs 客户端java 代码
package cn.itcast.hdfs_api;import org.apache.commons.io.IOUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*;import org.junit.Before;import org.junit.Test;import org.junit.experimental.theories.suppliers.TestedOn;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.io.InputStream;import java.net.MalformedURLException;import java.net.URI;import java.net.URISyntaxException;import java.net.URL;public class HdfsApiDemo { FileSystem fileSystem = null; Configuration configuration = null; /** * HDFS的路径,core-site.xml中配置的端口号 */ public static final String HDFS_PATH = "hdfs://10.139.12.149:9000"; /** * 解决无权限访问,设置远程hadoop的linux用户名称 */ public static final String USER = "root"; /** * 单元测试之前的准备工作,准备环境,加载配置 * * @throws Exception */ @Before public void setUp() throws Exception { System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.2"); System.out.println("HDFSApp is setUp....."); configuration = new Configuration(); fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, USER); //1:获取FileSystem(分布式文件系统) // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:8020"), new Configuration(),"root"); } /* 小文件的合并 */ @Test public void mergeFile() throws URISyntaxException, IOException, InterruptedException { //1:获取FileSystem(分布式文件系统) // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:8020"), new Configuration(),"root"); //2:获取hdfs大文件的输出流 FSDataOutputStream outputStream = fileSystem.create(new Path("/big_txt.txt")); //3:获取一个本地文件系统 LocalFileSystem localFileSystem = FileSystem.getLocal(new Configuration()); //4:获取本地文件夹下所有文件的详情 FileStatus[] fileStatuses = localFileSystem.listStatus(new Path("D:\\input")); //5:遍历每个文件,获取每个文件的输入流 for (FileStatus fileStatus : fileStatuses) { FSDataInputStream inputStream = localFileSystem.open(fileStatus.getPath()); //6:将小文件的数据复制到大文件 IOUtils.copy(inputStream, outputStream); IOUtils.closeQuietly(inputStream); } //7:关闭流 IOUtils.closeQuietly(outputStream); localFileSystem.close(); fileSystem.close(); } /* 文件的上传 */ @Test public void uploadFile() throws URISyntaxException, IOException { //1:获取FileSystem // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:8020"), new Configuration()); //2:调用方法,实现上传 fileSystem.copyFromLocalFile(new Path("E:\\apps\\hdfs\\hello.txt"), new Path("/user/ysw/input3/hello2.txt")); //3:关闭FileSystem fileSystem.close(); } /* 实现文件的下载:方式2 */ @Test public void downloadFile2() throws URISyntaxException, IOException, InterruptedException { //1:获取FileSystem // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:8020"), new Configuration(),"root"); //2:调用方法,实现文件的下载 fileSystem.copyToLocalFile(new Path("/user/ysw/input3/hello2.txt"), new Path("E:\\apps\\hdfs\\hello1.txt")); //3:关闭FileSystem fileSystem.close(); } /* 实现文件的下载 */ @Test public void downloadFile() throws URISyntaxException, IOException { //1:获取FileSystem // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:8020"), new Configuration()); //2:获取hdfs的输入流 FSDataInputStream inputStream = fileSystem.open(new Path("/a.txt")); //3:获取本地路径的输出流 FileOutputStream outputStream = new FileOutputStream("D://a.txt"); //4:文件的拷贝 IOUtils.copy(inputStream, outputStream); //5:关闭流 IOUtils.closeQuietly(inputStream); IOUtils.closeQuietly(outputStream); fileSystem.close(); } /* hdfs创建文件夹 */ @Test public void mkdirsTest() throws URISyntaxException, IOException { //1:获取FileSystem实例 // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:9000"), new Configuration()); //2:创建文件夹 //boolean bl = fileSystem.mkdirs(new Path("/aaa/bbb/ccc")); fileSystem.create(new Path("/aaa/bbb/ccc/a.txt")); fileSystem.create(new Path("/aaa2/bbb/ccc/a.txt")); //System.out.println(bl); //3: 关闭FileSystem //fileSystem.close(); } /* hdfs文件的遍历 */ @Test public void listFiles() throws URISyntaxException, IOException { //1:获取FileSystem实例 // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:9000"), new Configuration()); //2:调用方法listFiles 获取 /目录下所有的文件信息 RemoteIteratoriterator = fileSystem.listFiles(new Path("/"), true); //3:遍历迭代器 while (iterator.hasNext()){ LocatedFileStatus fileStatus = iterator.next(); //获取文件的绝对路径 : hdfs://kafka1:8020/xxx System.out.println(fileStatus.getPath() + "----" +fileStatus.getPath().getName()); //文件的block信息 BlockLocation[] blockLocations = fileStatus.getBlockLocations(); System.out.println("block数:"+blockLocations.length); } } /* 获取FileSystem;方式4 */ @Test public void getFileSystem4() throws URISyntaxException, IOException { // FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://kafka1:9000"), new Configuration()); System.out.println(fileSystem); } /* 获取FileSystem;方式3 */ @Test public void getFileSystem3() throws IOException { // Configuration configuration = new Configuration(); //指定文件系统类型 // configuration.set("fs.defaultFS", "hdfs://kafka1:8020"); //获取指定的文件系统 FileSystem fileSystem = FileSystem.newInstance(configuration); System.out.println(fileSystem); } /* 获取FileSystem;方式2 */ @Test public void getFileSystem2() throws URISyntaxException, IOException { // FileSystem fileSystem = FileSystem.get(new URI("hdfs://kafka1:8020"), new Configuration()); System.out.println(fileSystem); } /* 获取FileSystem;方式1 */ @Test public void getFileSystem1() throws IOException { //1:创建Configuration对象 Configuration configuration = new Configuration(); //2:设置文件系统的类型 configuration.set("fs.defaultFS", "hdfs://kafka1:9000"); //3:获取指定的文件系统 FileSystem fileSystem = FileSystem.get(configuration); //4:输出 System.out.println(fileSystem); } @Test public void urlHdfs() throws IOException { //1:注册url URL.setURLStreamHandlerFactory( new FsUrlStreamHandlerFactory()); //2:获取hdfs文件的输入流 InputStream inputStream = new URL("hdfs://kafka1:9000/a.txt").openStream(); //3:获取本地文件的输出流 FileOutputStream outputStream = new FileOutputStream(new File("D:\\hello2.txt")); //4:实现文件的拷贝 IOUtils.copy(inputStream, outputStream); //5:关流 IOUtils.closeQuietly(inputStream); IOUtils.closeQuietly(outputStream); }}
转载地址:https://blog.csdn.net/yangshengwei230612/article/details/117929940 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
很好
[***.229.124.182]2024年04月19日 02时10分02秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
[网鼎杯 2020 朱雀组]phpweb
2019-04-29
[BJDCTF2020]Cookie is so stable
2019-04-29
[SUCTF 2019]Pythonginx
2019-04-29
[极客大挑战 2019]RCE ME
2019-04-29
HackTheBox-------ScriptKiddie
2019-04-29
Shell学习
2019-04-29
[Zer0pts2020]Can you guess it?
2019-04-29
Jenkins资料整理
2019-04-29
ArrayList源码常用方法注意点
2019-04-29
MySQL资料整理
2019-04-29
Redis常用文章整理
2019-04-29
RocketMQ资料整理
2019-04-29
慢sql统计
2019-04-29
基于webRTC的1V1在线视频聊天(网页版DEMO)
2019-04-29
Disconf数据安全保护设计方案
2019-04-29
HttpClient获取302重定向的新网址方法
2019-04-29
Java 函数优雅之道【大厂规范】
2019-04-29
第三方接口调用规范
2019-04-29
java中调用js函数的方法
2019-04-29
可落地的云游戏解决方案
2019-04-29