SequenceFile是Hadoop API提供的一種二進制儲存文件,可以將它看成是一個大容器,裡面包含許多小文件,只是它像資料結構中的鏈結串列將小文件串接,而每個節點都是一個key/value的形式,通常key為fileName,value為文件內容,另外sequenceFile也支持壓縮,壓縮有利於節省空間及傳輸時間。
一、由於SequenceFile並沒有建立小文件對應到大文件的索引,所以在查詢小文件時就必須進行遍歷,降低讀取效率。
二、具有壓縮功能,包含三種壓縮方式,可以透過API也可以透過Config來設定
(一)NONE:無壓縮
(二)RECORD:僅壓縮value部分
(三)BLOCK:key/value皆壓縮
SequenceFile.setDefaultCompressionType(conf, SequenceFile.CompressionType.BLOCK);//JAVA API
conf.set("io.seqfile.compression.type", "BLOCK");//Config
三、使用SequenceFile來實際撰寫「某一目錄下所有檔案」以seq方式上傳到HDFS。
(一)首先撰寫一個SequenceFileOperation的類,並包含建構子,設定HDFS的路徑
public class SequenceFileOperation {
private Configuration conf = new Configuration();
public SequenceFileOperation(String HDFS_PATH){
conf.set("fs.default.name",HDFS_PATH);
}
}
(二)寫一個writeTo方法,使用輸入串流將數據導到應用程序中,在使用Sequence.Writer將key、value以append方式追加,其中key為字串型態也就是Hadoop中的Text,而value為BytesWritable
public void writeTo(String srcPath, SequenceFile.Writer writer, Text key, BytesWritable value){
InputStream in = null;
try {
in = new BufferedInputStream(new FileInputStream(srcPath));
String fileName = srcPath.substring(srcPath.lastIndexOf("\\") + 1);
key.set(fileName);
int len = 0;
byte[] buff = new byte[1024];
while ((len = in.read(buff))!= -1) {
value.set(buff, 0, len);
writer.append(key, value);//將每筆資訊追加到SequenceFile.Writer的尾端
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally {
IOUtils.closeStream(in);
}
}
在這裡並沒有一次寫入而是每次都寫1024個bytes,這樣的話讀取會較無效率,故可以改良成一次寫入全部。
byte[] buff = new byte[in.available()];
(三)撰寫uploadTo方法將指定目錄下所有檔案以打包形式上傳到HDFS,writer的部分使用SequenceFile.Writer
public void uploadTo(String srcDir,String desc){
SequenceFile.Writer writer = null;
try {
FileSystem fileSystem = FileSystem.get(conf);
SequenceFile.setDefaultCompressionType(conf, SequenceFile.CompressionType.BLOCK);
Text key = new Text();
BytesWritable value = new BytesWritable();
writer = SequenceFile.createWriter(fileSystem,conf,new Path(desc),key.getClass(),value.getClass());
File folder = new File(srcDir);
String[] list = folder.list();
for (int i = 0; i < list.length; i++) {
String filePath = srcDir + "\\" + list[i];
writeTo(filePath, writer, key, value);
}
} catch (IOException e) {
e.printStackTrace();
}finally {
IOUtils.closeStream(writer);
}
}
(四)以SequenceFile.Reader來下載,下載的部分給予該檔案位於哪個.seq檔,及檔案名稱來進行循序搜尋,因為在上傳時是以檔案名稱當作key故需給予檔案名稱。
public void downloadFromSeq(String src, String desc, String fileName){
SequenceFile.Reader reader = null;
OutputStream out = null;
try {
FileSystem fileSystem = FileSystem.get(conf);
reader = new SequenceFile.Reader(fileSystem,new Path(src),conf);
Text key = new Text();
Text compareKey = new Text(fileName);
BytesWritable value = new BytesWritable();
out = new BufferedOutputStream(new FileOutputStream(desc));
while (reader.next(key,value)){
if (!key.equals(compareKey)) continue;
out.write(value.getBytes(),0,value.getLength());
}
} catch (IOException e) {
e.printStackTrace();
}finally {
IOUtils.closeStream(out);
IOUtils.closeStream(reader);
}
}
(五)最後實際來使用上傳及下載的方法
public static void main(String[] args) {
final String HDFS_PATH = "hdfs://192.168.121.130:9000";
SequenceFileOperation sequenceFileOperation = new SequenceFileOperation(HDFS_PATH);
sequenceFileOperation.uploadTo("C:\\Users\\hduser\\Downloads\\myTestFiles","/testFile/test.seq");
// /testFile includes...
// amb.exe
// 1.pdf
// 2.pdf
// 3.pdf
// 4.pdf
// test.txt
sequenceFileOperation.downloadFromSeq("/testFile/test.seq","C:\\Users\\hduser\\Downloads\\1.pdf","1.pdf");
}
留言
張貼留言