一、當建置好Hadoop開發環境後就可以開始運用Hadoop所提供的Java API來對HDFS進行操作,透過這個API能夠進行HDFS的創建文件、上傳檔案、下載檔案、刪除檔案等,而以下是文件操作所牽涉到的幾個類:
(一)Configuration類:主要是將客戶端或者服務器的配置導入並使用甚至動態修改。
(二)FileSystem類:定義了一個hadoop的文件系統接口,該類是一個抽象類,透過以下靜態工廠方法取得實例。
Configuration conf = new Configuration();
FileSystem filesystem = FileSystem.get(conf);
FileSystem filesystem = FileSystem.get(new URI(...),conf);
二、接著再了解API的類之後就是環境的搭建了,以本身的環境為例:
(一)使用一台虛擬機,內部再用輕量虛擬化技術Docker,虛擬出兩個Container分別為Slave1、Slave2,而該台虛擬機本身就當成master,並將Master、Slave1、Slave2的IP位址暴露出來,讓我們的本機可以透過JAVA API去操作。
(二)本機就依照前篇所講的配置Java開發環境:Intellij hadoop hdfs Environment Settings
三、接下來就是透過上面幾個類來實作HDFS的操作,因此首先我們可以設計一個HDFS操作類名為HDFSOperation,內部宣告一個Configuration類的變數conf,並以建構子進行配置,這裡主要以HDFS順利操作為原則,因此只要配置master的路徑即可,讓我們順利跟namenode溝通。
/**
* HDFS操作實例
*/
public class HDFSOperation {
private Configuration conf = new Configuration();
/**
* 建構子,根據(HDFS_PATH)進行配置
* @param HDFS_PATH
*/
public HDFSOperation(String HDFS_PATH){
conf.set("fs.default.name",HDFS_PATH);
}
}
四、以下撰寫幾個常用的API的method:
/**
* 建立資料夾
* @param dirPath
*/
public void createDir(String dirPath){
try {
FileSystem filesystem = FileSystem.get(conf);
filesystem.mkdirs(new Path(dirPath));
System.out.print("創建資料夾成功");
filesystem.close();
} catch (IOException ex) {
Logger.getLogger(HDFSOperation.class.getName()).log(Level.SEVERE, null, ex);
}
}
/**
* 取得目錄下檔案狀態,e.g path、size等
* @param dirPath
* @return
*/
public String getFileStatus(String dirPath){
try {
FileSystem filesystem = FileSystem.get(conf);
FileStatus[] list = filesystem.listStatus(new Path(dirPath));
StringBuilder sbMessages = new StringBuilder();
for (FileStatus f : list){
String message = String.format("Path:%s,isFolder:%s,Size:%s",f.getPath(),f.isDirectory(),f.getLen());
sbMessages.append(message).append("\n");
}
filesystem.close();
return sbMessages.toString();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
*刪除遠端文件或者檔案(path),傳入isDir來做區別
* @param path
* @param isDir
*/
public void delete(String path,boolean isDir){
try {
FileSystem filesystem = FileSystem.get(conf);
filesystem.delete(new Path(path),isDir);
System.out.print("刪除成功");
filesystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 將本地文件(src)上傳到HDFS服務器指定路徑(desc),其中比較特殊的是OutputStream,要根據FileSystem這個類去取得輸出串流,並透過writeStreamingTo這個方法將串流輸出成檔案,而緩衝區大小為4096bytes。
* @param src 來源端路徑(本地)
* @param desc 目的端路徑(HDFS)
*/
public void uploadFileToHDFS(String src,String desc){
try {
FileSystem filesystem = FileSystem.get(conf);
InputStream in = new BufferedInputStream(new FileInputStream(src));
OutputStream out = filesystem.create(new Path(desc));
writeStreamingTo(in,out,4096);
//------------------using copyFromLocalFile------------------------//
//filesystem.copyFromLocalFile(new Path(src),new Path(desc));
System.out.print(String.format("Upload from %s to %s",src,conf.get("fs.default.name") + desc));
filesystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 從HDFS上,下載檔案到本地端,其中比較特殊的是InputStream,要根據FileSystem這個類去取得輸入串流,並透過writeStreamingTo這個方法將串流輸出成檔案,而緩衝區大小為4096bytes。
* @param src 來源端路徑(HDFS)
* @param desc 目的端路徑(本地)
*/
public void downloadFileFromHDFS(String src,String desc){
try {
FileSystem filesystem = FileSystem.get(conf);
OutputStream out = new BufferedOutputStream(new FileOutputStream(desc));
InputStream in = filesystem.open(new Path(src));
writeStreamingTo(in,out,4096);
//------------------using copyFromLocalFile------------------------//
//filesystem.copyFromLocalFile(new Path(src),new Path(desc));
System.out.print(String.format("Download from %s to %s",src,conf.get("fs.default.name") + desc));
filesystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 將來源數據利用輸入串流取出到應用程序,再利用輸出串流將檔案寫到目的,其中利用buffersize來控制緩衝區大小,並掌握進度。
* @param in 輸入串流
* @param out 輸出串流
* @param bufferSize 緩衝區大小
*/
private void writeStreamingTo(InputStream in,OutputStream out,int bufferSize){//待修改
try {
byte[] buffer = new byte[bufferSize];
int len = 0;
int totalBytes = in.available();
int writedBytes = 0;
System.out.println("Start Download.....");
while ((len = in.read(buffer)) > 0) {
writedBytes += len;
out.write(buffer,0,len);
System.out.println(String.format("%.2f%s",writedBytes / (double)totalBytes * 100,"%"));
}
in.close();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
五、最後實際來操作以上幾個方法:
public static void main(String[] args) {
final String HDFS_PATH = "hdfs://192.168.121.130:9000" ;
HDFSOperation hdfs = new HDFSOperation(HDFS_PATH);
System.out.print(hdfs.getFileStatus("/"));
// hdfs.createDir("/tmp");
// hdfs.uploadFileToHDFS("C:\\Users\\test\\Downloads\\hadoop-2.6.0.tar.gz","/tmp/hadoop-2.6.0.tar.gz");
// hdfs.downloadFileFromHDFS("/tmp/hadoop-2.6.0.tar.gz","C:\\Users\\test\\Downloads\\hadoop-2.6.0.tar.gz");
// hdfs.delete("/tmp/hadoop-2.6.0.tar.gz",false);
// hdfs.delete("/tmp",true);
}
留言
張貼留言