日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

SequenceFile文件

發布時間:2023/12/20 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SequenceFile文件 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

? ??SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。目前,也有不少人在該文件的基礎之上提出了一些HDFS中小文件存儲的解決方案,他們的基本思路就是將小文件進行合并成一個大文件,同時對這些小文件的位置信息構建索引。不過,這類解決方案還涉及到Hadoop的另一種文件格式——MapFile文件。SequenceFile文件并不保證其存儲的key-value數據是按照key的某個順序存儲的,同時不支持append操作。

??????在SequenceFile文件中,每一個key-value被看做是一條記錄(Record),因此基于Record的壓縮策略,SequenceFile文件可支持三種壓縮類型(SequenceFile.CompressionType):

NONE: 對records不進行壓縮;

RECORD: 僅壓縮每一個record中的value值;

BLOCK: 將一個block中的所有records壓縮在一起;

那么,基于這三種壓縮類型,Hadoop提供了對應的三種類型的Writer:

SequenceFile.Writer? 寫入時不壓縮任何的key-value對(Record);

?

[java]?view plaincopy
  • public?static?class?Writer?implements?java.io.Closeable?{??
  • ??
  • ...??
  • ???//初始化Writer??
  • ???void?init(Path?name,?Configuration?conf,?FSDataOutputStream?out,?Class?keyClass,?Class?valClass,?boolean?compress,?CompressionCodec?codec,?Metadata?metadata)?throws?IOException?{??
  • ??????this.conf?=?conf;??
  • ??????this.out?=?out;??
  • ??????this.keyClass?=?keyClass;??
  • ??????this.valClass?=?valClass;??
  • ??????this.compress?=?compress;??
  • ??????this.codec?=?codec;??
  • ??????this.metadata?=?metadata;??
  • ????????
  • ??????//創建非壓縮的對象序列化器??
  • ??????SerializationFactory?serializationFactory?=?new?SerializationFactory(conf);??
  • ??????this.keySerializer?=?serializationFactory.getSerializer(keyClass);??
  • ??????this.keySerializer.open(buffer);??
  • ??????this.uncompressedValSerializer?=?serializationFactory.getSerializer(valClass);??
  • ??????this.uncompressedValSerializer.open(buffer);??
  • ????????
  • ??????//創建可壓縮的對象序列化器??
  • ??????if?(this.codec?!=?null)?{??
  • ????????ReflectionUtils.setConf(this.codec,?this.conf);??
  • ????????this.compressor?=?CodecPool.getCompressor(this.codec);??
  • ????????this.deflateFilter?=?this.codec.createOutputStream(buffer,?compressor);??
  • ????????this.deflateOut?=?new?DataOutputStream(new?BufferedOutputStream(deflateFilter));??
  • ????????this.compressedValSerializer?=?serializationFactory.getSerializer(valClass);??
  • ????????this.compressedValSerializer.open(deflateOut);??
  • ??????}??
  • ????}??
  • ??????
  • ??
  • ??//添加一條記錄(key-value,對象值需要序列化)??
  • ??public?synchronized?void?append(Object?key,?Object?val)?throws?IOException?{??
  • ??????if?(key.getClass()?!=?keyClass)??
  • ????????throw?new?IOException("wrong?key?class:?"+key.getClass().getName()?+"?is?not?"+keyClass);??
  • ????????
  • ??????if?(val.getClass()?!=?valClass)??
  • ????????throw?new?IOException("wrong?value?class:?"+val.getClass().getName()?+"?is?not?"+valClass);??
  • ??
  • ??????buffer.reset();??
  • ??
  • ??????//序列化key(將key轉化為二進制數組),并寫入緩存buffer中??
  • ??????keySerializer.serialize(key);??
  • ??????int?keyLength?=?buffer.getLength();??
  • ??????if?(keyLength?<?0)??
  • ????????throw?new?IOException("negative?length?keys?not?allowed:?"?+?key);??
  • ??
  • ??????//compress在初始化是被置為false???
  • ??????if?(compress)?{??
  • ????????deflateFilter.resetState();??
  • ????????compressedValSerializer.serialize(val);??
  • ????????deflateOut.flush();??
  • ????????deflateFilter.finish();??
  • ??????}?else?{??
  • ????????//序列化value值(不壓縮),并將其寫入緩存buffer中??
  • ????????uncompressedValSerializer.serialize(val);??
  • ??????}??
  • ??
  • ??????//將這條記錄寫入文件流??
  • ??????checkAndWriteSync();????????????????????????????????//?sync??
  • ??????out.writeInt(buffer.getLength());???????????????????//?total?record?length??
  • ??????out.writeInt(keyLength);????????????????????????????//?key?portion?length??
  • ??????out.write(buffer.getData(),?0,?buffer.getLength());?//?data??
  • ????}??
  • ??
  • ????//添加一條記錄(key-value,二進制值)??
  • ????public?synchronized?void?appendRaw(byte[]?keyData,?int?keyOffset,?int?keyLength,?ValueBytes?val)?throws?IOException?{??
  • ??????if?(keyLength?<?0)??
  • ????????throw?new?IOException("negative?length?keys?not?allowed:?"?+?keyLength);??
  • ??
  • ??????int?valLength?=?val.getSize();??
  • ??
  • ??????checkAndWriteSync();??
  • ????????
  • ??????//直接將key-value寫入文件流??
  • ??????out.writeInt(keyLength+valLength);??????????//?total?record?length??
  • ??????out.writeInt(keyLength);????????????????????//?key?portion?length??
  • ??????out.write(keyData,?keyOffset,?keyLength);???//?key??
  • ??????val.writeUncompressedBytes(out);????????????//?value??
  • ????}??
  • ??
  • ...??
  • ??
  • }??
  • ?

    SequenceFile.RecordCompressWriter寫入時只壓縮key-value對(Record)中的value;

    ?

    [java]?view plaincopy
  • static?class?RecordCompressWriter?extends?Writer?{??
  • ...??
  • ??
  • ???public?synchronized?void?append(Object?key,?Object?val)?throws?IOException?{??
  • ??????if?(key.getClass()?!=?keyClass)??
  • ????????throw?new?IOException("wrong?key?class:?"+key.getClass().getName()?+"?is?not?"+keyClass);??
  • ????????
  • ??????if?(val.getClass()?!=?valClass)??
  • ????????throw?new?IOException("wrong?value?class:?"+val.getClass().getName()?+"?is?not?"+valClass);??
  • ??
  • ??????buffer.reset();??
  • ??
  • ??????//序列化key(將key轉化為二進制數組),并寫入緩存buffer中??
  • ??????keySerializer.serialize(key);??
  • ??????int?keyLength?=?buffer.getLength();??
  • ??????if?(keyLength?<?0)??
  • ????????throw?new?IOException("negative?length?keys?not?allowed:?"?+?key);??
  • ??
  • ??????//序列化value值(不壓縮),并將其寫入緩存buffer中??
  • ??????deflateFilter.resetState();??
  • ??????compressedValSerializer.serialize(val);??
  • ??????deflateOut.flush();??
  • ??????deflateFilter.finish();??
  • ??
  • ??????//將這條記錄寫入文件流??
  • ??????checkAndWriteSync();????????????????????????????????//?sync??
  • ??????out.writeInt(buffer.getLength());???????????????????//?total?record?length??
  • ??????out.writeInt(keyLength);????????????????????????????//?key?portion?length??
  • ??????out.write(buffer.getData(),?0,?buffer.getLength());?//?data??
  • ????}??
  • ??
  • ????/**?添加一條記錄(key-value,二進制值,value已壓縮)?*/??
  • ????public?synchronized?void?appendRaw(byte[]?keyData,?int?keyOffset,??
  • ????????int?keyLength,?ValueBytes?val)?throws?IOException?{??
  • ??
  • ??????if?(keyLength?<?0)??
  • ????????throw?new?IOException("negative?length?keys?not?allowed:?"?+?keyLength);??
  • ??
  • ??????int?valLength?=?val.getSize();??
  • ????????
  • ??????checkAndWriteSync();????????????????????????//?sync??
  • ??????out.writeInt(keyLength+valLength);??????????//?total?record?length??
  • ??????out.writeInt(keyLength);????????????????????//?key?portion?length??
  • ??????out.write(keyData,?keyOffset,?keyLength);???//?'key'?data??
  • ??????val.writeCompressedBytes(out);??????????????//?'value'?data??
  • ????}??
  • ??????
  • ??}?//?RecordCompressionWriter??
  • ??
  • ??
  • ...??
  • }??
  • SequenceFile.BlockCompressWriter 寫入時將一批key-value對(Record)壓縮成一個Block;

    ?

    [java]?view plaincopy
  • static?class?BlockCompressWriter?extends?Writer?{??
  • ...??
  • ??
  • ???void?init(int?compressionBlockSize)?throws?IOException?{??
  • ??????this.compressionBlockSize?=?compressionBlockSize;??
  • ??????keySerializer.close();??
  • ??????keySerializer.open(keyBuffer);??
  • ??????uncompressedValSerializer.close();??
  • ??????uncompressedValSerializer.open(valBuffer);??
  • ????}??
  • ??????
  • ????/**?Workhorse?to?check?and?write?out?compressed?data/lengths?*/??
  • ????private?synchronized?void?writeBuffer(DataOutputBuffer?uncompressedDataBuffer)?throws?IOException?{??
  • ??????deflateFilter.resetState();??
  • ??????buffer.reset();??
  • ??????deflateOut.write(uncompressedDataBuffer.getData(),?0,?uncompressedDataBuffer.getLength());??
  • ??????deflateOut.flush();??
  • ??????deflateFilter.finish();??
  • ????????
  • ??????WritableUtils.writeVInt(out,?buffer.getLength());??
  • ??????out.write(buffer.getData(),?0,?buffer.getLength());??
  • ????}??
  • ??????
  • ????/**?Compress?and?flush?contents?to?dfs?*/??
  • ????public?synchronized?void?sync()?throws?IOException?{??
  • ??????if?(noBufferedRecords?>?0)?{??
  • ????????super.sync();??
  • ??????????
  • ????????//?No.?of?records??
  • ????????WritableUtils.writeVInt(out,?noBufferedRecords);??
  • ??????????
  • ????????//?Write?'keys'?and?lengths??
  • ????????writeBuffer(keyLenBuffer);??
  • ????????writeBuffer(keyBuffer);??
  • ??????????
  • ????????//?Write?'values'?and?lengths??
  • ????????writeBuffer(valLenBuffer);??
  • ????????writeBuffer(valBuffer);??
  • ??????????
  • ????????//?Flush?the?file-stream??
  • ????????out.flush();??
  • ??????????
  • ????????//?Reset?internal?states??
  • ????????keyLenBuffer.reset();??
  • ????????keyBuffer.reset();??
  • ????????valLenBuffer.reset();??
  • ????????valBuffer.reset();??
  • ????????noBufferedRecords?=?0;??
  • ??????}??
  • ????????
  • ????}??
  • ??
  • ??
  • ???//添加一條記錄(key-value,對象值需要序列化)??
  • ???public?synchronized?void?append(Object?key,?Object?val)?throws?IOException?{??
  • ??????if?(key.getClass()?!=?keyClass)??
  • ????????throw?new?IOException("wrong?key?class:?"+key+"?is?not?"+keyClass);??
  • ????????
  • ??????if?(val.getClass()?!=?valClass)??
  • ????????throw?new?IOException("wrong?value?class:?"+val+"?is?not?"+valClass);??
  • ??
  • ??????//序列化key(將key轉化為二進制數組)(未壓縮),并寫入緩存keyBuffer中??
  • ??????int?oldKeyLength?=?keyBuffer.getLength();??
  • ??????keySerializer.serialize(key);??
  • ??????int?keyLength?=?keyBuffer.getLength()?-?oldKeyLength;??
  • ??????if?(keyLength?<?0)??
  • ????????throw?new?IOException("negative?length?keys?not?allowed:?"?+?key);??
  • ??????WritableUtils.writeVInt(keyLenBuffer,?keyLength);??
  • ??
  • ??????//序列化value(將value轉化為二進制數組)(未壓縮),并寫入緩存valBuffer中??
  • ??????int?oldValLength?=?valBuffer.getLength();??
  • ??????uncompressedValSerializer.serialize(val);??
  • ??????int?valLength?=?valBuffer.getLength()?-?oldValLength;??
  • ??????WritableUtils.writeVInt(valLenBuffer,?valLength);??
  • ????????
  • ??????//?Added?another?key/value?pair??
  • ??????++noBufferedRecords;??
  • ????????
  • ??????//?Compress?and?flush???
  • ??????int?currentBlockSize?=?keyBuffer.getLength()?+?valBuffer.getLength();??
  • ??????//block已滿,可將整個block進行壓縮并寫入文件流??
  • ??????if?(currentBlockSize?>=?compressionBlockSize)?{??
  • ????????sync();??
  • ??????}??
  • ????}??
  • ??????
  • ????/**添加一條記錄(key-value,二進制值,value已壓縮).?*/??
  • ????public?synchronized?void?appendRaw(byte[]?keyData,?int?keyOffset,?int?keyLength,?ValueBytes?val)?throws?IOException?{??
  • ????????
  • ??????if?(keyLength?<?0)??
  • ????????throw?new?IOException("negative?length?keys?not?allowed");??
  • ??
  • ??????int?valLength?=?val.getSize();??
  • ????????
  • ??????//?Save?key/value?data?in?relevant?buffers??
  • ??????WritableUtils.writeVInt(keyLenBuffer,?keyLength);??
  • ??????keyBuffer.write(keyData,?keyOffset,?keyLength);??
  • ??????WritableUtils.writeVInt(valLenBuffer,?valLength);??
  • ??????val.writeUncompressedBytes(valBuffer);??
  • ??
  • ??????//?Added?another?key/value?pair??
  • ??????++noBufferedRecords;??
  • ??
  • ??????//?Compress?and?flush???
  • ??????int?currentBlockSize?=?keyBuffer.getLength()?+?valBuffer.getLength();???
  • ??????if?(currentBlockSize?>=?compressionBlockSize)?{??
  • ????????sync();??
  • ??????}??
  • ????}??
  • ??????
  • ??}?//?RecordCompressionWriter??
  • ??
  • ??
  • ...??
  • }??
  • ?????源碼中,block的大小compressionBlockSize默認值為1000000,也可通過配置參數io.seqfile.compress.blocksize來指定。

    ?

    ?? 根據三種壓縮算法,共有三種類型的SequenceFile文件格式:

    1). Uncompressed SequenceFile

    ????

    ?

    ?

    2). Record-Compressed SequenceFile

    3). Block-Compressed SequenceFile

    ?

    轉載于:https://www.cnblogs.com/mfryf/p/7072446.html

    創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

    總結

    以上是生活随笔為你收集整理的SequenceFile文件的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。