日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

mapreduce原理_Hbase Bulkload 原理面试必备

發布時間:2024/9/27 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 mapreduce原理_Hbase Bulkload 原理面试必备 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

當需要大批量的向Hbase導入數據時,我們可以使用Hbase Bulkload的方式,這種方式是先生成Hbase的底層存儲文件 HFile,然后直接將這些 HFile 移動到Hbase的存儲目錄下。它相比調用Hbase 的 put 接口添加數據,處理效率更快并且對Hbase 運行影響更小。

下面假設我們有一個 CSV 文件,是存儲用戶購買記錄的。它一共有三列, order_id,consumer,product。我們需要將這個文件導入到Hbase里,其中 order_id 作為Hbase 的 row key。

12345bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=$'\x01'-Dimporttsv.columns=HBASE_ROW_KEY,cf:consumer,cf:product -Dimporttsv.bulk.output= bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles

可以看到批量導入只需要上述兩部, 生成 HFile 文件 和 加載 HFile 文件。下面我們來深入了解其原理

底層實現原理

生成 HFile 是調用了 MapReduce 來實現的。它有兩種實現方式,雖然最后生成的 HFile 是一樣的,但中間過程卻是不一樣。現在我們先回顧下 MapReduce 的編程模型,主要分為下列組件:

  • InputFormat:負責讀取數據源,并且將數據源切割成多個分片,分片的數目等于Map的數目

  • Mapper:負責接收分片,生成中間結果,K 為數據的 key 值類型,V為數據的 value 值類型

  • Reducer:Mapper的數據會按照 key 值分組,Reducer接收的數據格式>

  • OutputFormat:負責將Reducer生成的數據持久化,比如存儲到 hdfs。

MapReduce 實現 一

MapReducer 程序中各個組件的實現類,如下所示:

  • InputFormat 類:TextInputFormat,數據輸出格式 LongWritable,Text(數據所在行號,行數據)

  • Mapper 類:TsvImporterTextMapper,數據輸出格式 ImmutableBytesWritable, Text(row key,行數據)

  • Reduce 類:TextSortReducer,數據輸出格式 ImmutableBytesWritable, KeyValue (row key,單列數據)

  • OutputFormat 類:HFileOutputFormat2,負責將結果持久化 HFile

執行過程如下:

  • TextInputFormat 會讀取數據源文件,按照文件在 hdfs 的 Block 切割,每個Block對應著一個切片

  • Mapper 會解析每行數據,然后從中解析出 row key,生成(row key, 行數據)

  • Reducer 會解析行數據,為每列生成 KeyValue。這里簡單說下 KeyValue,它是 Hbase 存儲每列數據的格式, 詳細原理后面會介紹到。如果一個 row key 對應的列過多,它會將列分批處理。處理完一批數據之后,會寫入(null,null)這一條特殊的數據,表示 HFileOutputFormat2 在持久化的過程中,需要新創建一個 HFile。

  • 這里簡單的說下 TextSortReducer,它的原理與下面的實現方式二,使用到的 PutSortReducer 相同,只不過從 Map 端接收到的數據為原始的行數據。如果 row key 對應的數據過多時,它也會使用 TreeSet 來去重,TreeSet 保存的數據最大字節數,不能超過1GB。如果超過了,那么就會分批輸。

    MapReduce 實現 二

    MapReducer 程序中各個組件的實現類,如下所示:

    • InputFormat 類:TextInputFormat,數據輸出格式 LongWritable,Text(數據所在行號,數據)

    • Mapper 類:TsvImporterMapper,數據輸出格式 ImmutableBytesWritable,Put (row key,Put)

    • Combiner 類:PutCombiner

    • Reducer 類:PutSortReducer,數據輸出格式 ImmutableBytesWritable, KeyValue(row key,單列數據)

    • OutputFormat 類:HFileOutputFormat2,負責將結果持久化 HFile

    這里使用了 Combiner,它的作用是在 Map 端進行一次初始的 reduce 操作,起到聚合的作用,這樣就減少了 Reduce 端與 Map 端的數據傳輸,提高了運行效率。

    執行過程如下:

  • TextInputFormat 會讀取數據源文件,原理同實現 一

  • Mapper 會解析每行數據,然后從中解析出 row key,并且生成 Put 實例。生成(row key, Put)

  • Combiner 會按照 row key 將多個 Put 進行合并,它也是分批合并的。

  • Reducer 會遍歷 Put 實例,為每列生成 KeyValue 并且去重。

  • 這里講下PutSortReducer的具體實現,下面的代碼經過簡化,去掉了KeyValue中關于Tag的處理:

    1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556public class PutSortReducer extends Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> { // the cell creator private CellCreator kvCreator; @Override protected void reduce( ImmutableBytesWritable row, java.lang.Iterable puts, Reducer ImmutableBytesWritable, KeyValue>.Context context) throws java.io.IOException, InterruptedException { // 這里指定了一個閾值,默認為10GB過大。如果puts中不重復的數據過大,就會按照這個閾值分批處理 long threshold = context.getConfiguration().getLong( "putsortreducer.row.threshold", 1L * (1<<30)); Iterator iter = puts.iterator(); // 開始遍歷 puts列表 while (iter.hasNext()) { // 這個TreeSet就是用來去重的,比如向同個qualifier添加值 TreeSet map = new TreeSet<>(CellComparator.getInstance()); // 記錄map里保存的數據長度 long curSize = 0; // 遍歷 puts列表,直到不重復的數據不超過閾值 while (iter.hasNext() && curSize < threshold) { // 從列表中獲取值 Put p = iter.next(); // 遍歷這個Put的所有列值,一個Put包含了多列,這些列由Cell表示 for (List cells: p.getFamilyCellMap().values()) { for (Cell cell: cells) { KeyValue kv = null; kv = KeyValueUtil.ensureKeyValue(cell); } if (map.add(kv)) { // 如果這列值沒有重復,那么添加到TreeSet中,并且更新curSize的值 curSize += kv.heapSize(); } } } } // 將map里的數據,調用context.write方法輸出 int index = 0; for (KeyValue kv : map) { context.write(row, kv); if (++index % 100 == 0) context.setStatus("Wrote " + index); } // 如果還有,那么說明此行數據過大,那么就會輸出一條特殊的記錄(null, null) if (iter.hasNext()) { // force flush because we cannot guarantee intra-row sorted order context.write(null, null); } } }}

    從上面的代碼可以看到,PutSortReducer會使用到TreeSet去重,TreeSet會保存數據,默認不超過 1GB。如果當Reducer的內存設置過小時,并且數據過大時,是有可能會造成內存溢出。如果遇到這種情況,可以通過減少閾值或者增大Reducer的內存。

    兩種實現方式比較

    第一種方式實現簡單,它從Map 端傳遞到 Reduce 端的中間結果的數據格式很緊湊,如果是數據源重復的數據不多,建議使用這種。

    第二種方式實現相對復雜,它從Map 端傳遞到 Reduce 端的中間結果的數據格式,使用 Put 來表示,它的數據存儲比原始的數據要大。但是它使用了 Combiner 來初步聚合,減小了 Map 端傳遞到 Reduce 端的數據大小。如果是數據源重復比較多,建議采用第二種方式。

    Hbase 默認采用第二種方式,如果用戶想使用第一種方式,需要在運行命令時,指定 importtsv.mapper.class 的值為 org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper。

    數據源解析

    Mapper 接收到數據后,需要解析每行數據,從中讀取各列的值。它會按照分割符來切割數據,然后根據指定的列格式,生成每列的數據。客戶在使用命令時,通過 importtsv.separator 參數指定分隔符,通過 importtsv.columns 參數指定列格式。在客戶端指定的列名中, 有些會有著特殊含義,比如 HBASE_ROW_KEY 代表著該列是作為 row key,HBASE_TS_KEY 代表著該列作為數據的 timestamp,HBASE_ATTRIBUTES_KEY 代表著該列是屬性列等。

    TsvParser 類負責解析數據,它定義在 ImportTsv 類里。這里需要注意下,它不支持負責的 CSV 格式,只是簡單的根據分隔符作為列的劃分,根據換行符作為每條數據的劃分。

    它的原理比較簡單,這里不再詳細介紹。

    Reducer的數目選擇

    我們知道MapReduce程序的一般瓶頸在于 reduce 階段,如果我們能夠適當增加 reduce 的數目,一般能夠提高運行效率(如果數據傾斜不嚴重)。我們還知道 Hbase 支持超大數據量的表,它會將表的數據自動切割,分布在不同的服務上。這些數據切片在 Hbase 里,稱為Region, 每個Region只負責一段 row key 范圍的數據。

    Hbase 在批量導入的時候,會去獲取表的 Region 分布情況,然后將 Reducer 的數目 設置為 Region 數目。如果在導入數據之前還沒有創建表,Hbase會自動創建,但是創建的表的region數只有一個。所以在生成HFile之前,我們可以自行創建表,并指定 Reigion 的分布情況,那么就能提高 Reducer 的數目。

    Reducer 的數目決定,是在 HFileOutputFormat2 的 configureIncrementalLoad 方法里。它會讀取表的 region 分布情況,然后調用 setNumReduceTasks 方法設置 reduce 數目。下面的代碼經過簡化:

    12345678910111213141516171819202122232425public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> { public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, RegionLocator regionLocator) throws IOException { ArrayList singleTableInfo = new ArrayList<>(); singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); } static void configureIncrementalLoad(Job job, List multiTableInfo, Class extends OutputFormat, ?>> cls) throws IOException { // 這里雖然支持多表,但是批量導入時只會使用單表 List regionLocators = new ArrayList<>( multiTableInfo.size()); for( TableInfo tableInfo : multiTableInfo ) { // 獲取region分布情況 regionLocators.add(tableInfo.getRegionLocator()); ...... } // 獲取region的row key起始大小 List startKeys = getRegionStartKeys(regionLocators, writeMultipleTables); // 設置reduce的數目 job.setNumReduceTasks(startKeys.size()); } }

    Hbase 數據存儲格式

    Hbase的每列數據都是單獨存儲的,都是以 KeyValue 的形式。KeyValue 的數據格式如下圖所示:

    123----------------------------------------------- keylength | valuelength | key | value | Tags-----------------------------------------------

    其中 key 的格式如下:

    123---------------------------------------------------------------------------------------------- rowlength | row | columnfamilylength | columnfamily | columnqualifier | timestamp | keytype ----------------------------------------------------------------------------------------------

    Tags的格式如下:

    123------------------------- tagslength | tagsbytes -------------------------

    tagsbytes 可以包含多個 tag,每個 tag 的格式如下:

    123---------------------------------- taglength | tagtype | tagbytes----------------------------------

    Reducer 會使用 CellCreator 類,負責生成 KeyValue。CellCreator 的原理很簡單,這里不再詳細介紹。

    生成 HFile

    HFileOutputFormat2 負責將Reduce的結果,持久化成 HFile 文件。持久化目錄的格式如下:

    1234567.|---- column_family_1| |---- uuid_1| `---- uuid_2|---- column_family_2| |---- uuid3| `---- uuid4

    每個 column family 對應一個目錄,這個目錄會有多個 HFile 文件。

    HFileOutputFormat2 會創建 RecordWriter 實例,所有數據的寫入都是通過 RecordWriter。

    12345678910111213141516171819public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> { @Override public RecordWritergetRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { // 調用createRecordWriter方法創建 return createRecordWriter(context, this.getOutputCommitter(context)); } static RecordWriter createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer) throws IOException { // 實例化一個匿名類 return new RecordWriter() { ...... } }}

    可以看到 createRecordWriter 方法,返回了一個匿名類。繼續看看這個匿名類的定義:

    123456789101112// 封裝了StoreFileWriter,記錄了寫入的數據長度static class WriterLength { long written = 0; StoreFileWriter writer = null;}class RecordWriter<ImmutableBytesWritable, V>() { // key值為表名和column family組成的字節,value為對應的writer private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); // 是否需要創建新的HFile private boolean rollRequested = false;}

    從上面 WriterLength 類的定義,我們可以知道 RecordWriter的底層原理是調用了StoreFileWriter的接口。對于StoreFile,我們回憶下Hbase的寫操作,它接收客戶端的寫請求,首先寫入到內存中MemoryStore,然后刷新到磁盤生成StoreFile。如果該表有兩個column family,就會有兩個MemoryStore和兩個StoreFile,對應于不同的column family。所以 RecordWriter 類有個哈希表,記錄著每個 column family 的 StoreFileWriter。(這里說的 StoreFile 也就是 HFile)

    因為 HFile 支持不同的壓縮算法,不同的塊大小,RecordWriter 會根據配置,獲取HFile的格式,然后創建對應的 StoreFileWriter。下面創建 StoreFileWriter 時只指定了文件目錄,StoreFileWriter會在這個目錄下,使用 uuid 生成一個唯一的文件名。

    1234567891011121314151617181920212223242526272829303132333435363738class RecordWriter<ImmutableBytesWritable, V>() { // favoredNodes 表示創建HFile文件,希望盡可能在這些服務器節點上 private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf, InetSocketAddress[] favoredNodes) throws IOException { // 根據表名和column family生成唯一字節 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); Path familydir = new Path(outputDir, Bytes.toString(family)); WriterLength wl = new WriterLength(); // 獲取HFile的壓縮算法 Algorithm compression = compressionMap.get(tableAndFamily); // 獲取bloom過濾器信息 BloomType bloomType = bloomTypeMap.get(tableAndFamily); // 獲取HFile其他的配置 ..... // 生成HFile的配置信息 HFileContextBuilder contextBuilder = new HFileContextBuilder() .withCompression(compression) .withChecksumType(HStore.getChecksumType(conf)) .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) .withBlockSize(blockSize); HFileContext hFileContext = contextBuilder.build(); // 實例化 StoreFileWriter f (null == favoredNodes) { wl.writer = new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs) .withOutputDir(familydir).withBloomType(bloomType) .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build(); } else { wl.writer = new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) .withOutputDir(familydir).withBloomType(bloomType) .withComparator(CellComparator.getInstance()).withFileContext(hFileContext) .withFavoredNodes(favoredNodes).build(); } // 添加到 writers集合中 this.writers.put(tableAndFamily, wl); return wl; }}

    繼續看看 RecordWriter 的寫操作:

    1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768class RecordWriter<ImmutableBytesWritable, V>() { @Override public void write(ImmutableBytesWritable row, V cell) Cell kv = cell; // 收到空數據,表示需要立即刷新到磁盤,并且創建新的HFile if (row == null && kv == null) { // 刷新到磁盤 rollWriters(null); return; } // 根據table和column family生成唯一值 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); // 獲取對應的writer WriterLength wl = this.writers.get(tableAndFamily); if (wl == null) { // 如果為空,那么先創建對應的文件目錄 Path writerPath = null; writerPath = new Path(outputDir, Bytes.toString(family)); fs.mkdirs(writerPath); } // 檢測當前HFile的大小是否超過了最大值,默認為10GB if (wl != null && wl.written + length >= maxsize) { this.rollRequested = true; } // 如果當前HFile過大,那么需要將它刷新到磁盤 if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { rollWriters(wl); } // 創建writer if (wl == null || wl.writer == null) { if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { // 如果開啟了位置感知,那么就會去獲取row所在的region的地址 HRegionLocation loc = null; loc = locator.getRegionLocation(rowKey); InetSocketAddress initialIsa = new InetSocketAddress(loc.getHostname(), loc.getPort()); // 創建writer,指定了偏向節點 wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa}) } else { // 創建writer wl = getNewWriter(tableNameBytes, family, conf, null); } } wl.writer.append(kv); wl.written += length; this.previousRow = rowKey; } private void rollWriters(WriterLength writerLength) throws IOException { if (writerLength != null) { // 關閉當前writer closeWriter(writerLength); } else { // 關閉所有family對應的writer for (WriterLength wl : this.writers.values()) { closeWriter(wl); } } this.rollRequested = false; } private void closeWriter(WriterLength wl) throws IOException { if (wl.writer != null) { close(wl.writer); } wl.writer = null; wl.written = 0; }}

    RecordWriter在寫入數據時,如果遇到一條 row key 和 value 都為 null 的數據時,這條數據有著特殊的含義,表示writer應該立即 flush。在每次創建RecordWriter時,它會根據此時row key 的值,找到所屬 Region 的服務器地址,然后盡量在這臺服務器上,創建新的HFile文件。

    加載 HFile

    上面生成完 HFile 之后,我們還需要調用第二條命令完成加載 HFile 過程。這個過程分為兩步,切割數據量大的 HFile 文件和發送加載請求讓服務器完成。

    切割 HFile

    首先它會遍歷目錄下的每個 HFile ,

  • 首先檢查 HFile 里面數據的 family 在 Hbase 表里是否存在。

  • 獲取HFile 數據的起始 row key,找到 Hbase 里對應的 Region,然后比較兩者之間的 row key 范圍

  • 如果 HFile 的 row key 范圍比 Region 大,也就是 HFile 的結束 row key 比這個 Region 的 結束 row Key 大,那么需要將這個 HFile 切割成兩份,切割值為 Region 的結束 row key。

  • 繼續從上一部切割生成的兩份HFile中,選擇第二份 HFile(它的row key 大于 Regioin 的結束 row key),將它繼續按照第二步切割,直到所有HFile的 row key范圍都能在一個Region里。

  • 在割切HFile的過程中,還會檢查 column family 對應的 HFile數目。如果一個 column family 對應的 HFile 數目過多,默認數目為32,程序就會報錯。但是這個值通過指定 hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily,來設置更大的值。

    發送加載請求

    當完成了HFile的切割后,最后的導入動作是發送 BulkLoadHFileRequest 請求給 Hbase 服務端。Hbase 服務端會處理該請求,完成HFile加載。

    其他

    至于我研究 Hbase Bulkload 的原因,是在使用過程中發生了 Out Of Memory 的錯誤。雖然經過排查,發現和 Hbase Bulkload 的原理沒什么關系,不過在此也順便提一下,希望能幫到遇到類似情況的人。首先說下我使用的Hadoop 版本是 CDH 5.12.2。

    經過排查,發現是因為 Hbase Bulkload 底層用的 MapReduce 模式為本地模式,而不是集群 Yarn 的方式。我們知道 MapReduce 程序選擇哪一種方式,可以通過 mapreduce.framework.name 配置項指定。雖然在 CDH 的 Yarn 配置頁面里,設置了該配置為 yarn,但是 Hbase Bulkload 仍然使用本地模式。后來發現 Yarn 組件下有個 Gateway 的角色實例,這是個特殊的角色,它負責 Yarn 客戶端的配置部署。而恰好這臺主機沒有安裝,所以在使用 Hbase Bulkload 時,沒有讀取到 Yarn 的配置。解決方法是在 CDH 界面添加 Gateway 實例就好了。

    總結

    以上是生活随笔為你收集整理的mapreduce原理_Hbase Bulkload 原理面试必备的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。