日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

HBase-1.2.4LruBlockCache实现分析(一)

發(fā)布時間:2023/11/29 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 HBase-1.2.4LruBlockCache实现分析(一) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、簡介

? ? ??BlockCache是HBase中的一個重要特性,相比于寫數(shù)據(jù)時緩存為Memstore,讀數(shù)據(jù)時的緩存則為BlockCache。
? ? ? LruBlockCache是HBase中BlockCache的默認實現(xiàn),它采用嚴格的LRU算法來淘汰Block。

二、緩存級別

? ? ? 目前有三種緩存級別,定義在BlockPriority中,如下:

public enum BlockPriority {/*** Accessed a single time (used for scan-resistance)*/SINGLE,/*** Accessed multiple times*/MULTI,/*** Block from in-memory store*/MEMORY }

? ? ? 1、SINGLE:主要用于scan等,避免大量的這種一次的訪問導致緩存替換;

? ? ? 2、MULTI:多次緩存;

? ? ? 3、MEMORY:常駐緩存的,比如meta信息等。

三、緩存實現(xiàn)分析

? ? ??LruBlockCache緩存的實現(xiàn)在方法cacheBlock()中,實現(xiàn)邏輯如下:

? ? ? 1、首先需要判斷需要緩存的數(shù)據(jù)大小是否超過最大塊大小,按照2%的頻率記錄warn類型log并返回;

? ? ? 2、從緩存map中根據(jù)cacheKey嘗試獲取已緩存數(shù)據(jù)塊cb;

? ? ? 3、如果已經(jīng)緩存過,比對下內(nèi)容,如果內(nèi)容不一樣,拋出異常,否則記錄warn類型log并返回;

? ? ? 4、獲取當前緩存大小currentSize,獲取可以接受的緩存大小currentAcceptableSize,計算硬性限制大小hardLimitSize;

? ? ? 5、如果當前大小超過硬性限制,當回收沒在執(zhí)行時,執(zhí)行回收并返回,否則直接返回;

? ? ? 6、利用cacheKey、數(shù)據(jù)buf等構造Lru緩存數(shù)據(jù)塊實例cb;

? ? ? 7、將cb放置入map緩存中;

? ? ? 8、元素個數(shù)原子性增1;

? ? ? 9、如果新大小超過當前可以接受的大小,且未執(zhí)行回收過程中,執(zhí)行內(nèi)存回收。

? ? ? 詳細代碼如下,可自行閱讀分析:

// BlockCache implementation/*** Cache the block with the specified name and buffer.* <p>* It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)* this can happen, for which we compare the buffer contents.* @param cacheKey block's cache key* @param buf block buffer* @param inMemory if block is in-memory* @param cacheDataInL1*/@Overridepublic void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,final boolean cacheDataInL1) {// 首先需要判斷需要緩存的數(shù)據(jù)大小是否超過最大塊大小if (buf.heapSize() > maxBlockSize) {// If there are a lot of blocks that are too// big this can make the logs way too noisy.// So we log 2%if (stats.failInsert() % 50 == 0) {LOG.warn("Trying to cache too large a block "+ cacheKey.getHfileName() + " @ "+ cacheKey.getOffset()+ " is " + buf.heapSize()+ " which is larger than " + maxBlockSize);}return;}// 從緩存map中根據(jù)cacheKey嘗試獲取已緩存數(shù)據(jù)塊LruCachedBlock cb = map.get(cacheKey);if (cb != null) {// 如果已經(jīng)緩存過// compare the contents, if they are not equal, we are in big troubleif (compare(buf, cb.getBuffer()) != 0) {// 比對緩存內(nèi)容,如果不相等,拋出異常,否則記錄warn日志throw new RuntimeException("Cached block contents differ, which should not have happened."+ "cacheKey:" + cacheKey);}String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";LOG.warn(msg);return;}// 獲取當前緩存大小long currentSize = size.get();// 獲取可以接受的緩存大小long currentAcceptableSize = acceptableSize();// 計算硬性限制大小long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);if (currentSize >= hardLimitSize) {// 如果當前大小超過硬性限制,當回收沒在執(zhí)行時,執(zhí)行回收并返回stats.failInsert();if (LOG.isTraceEnabled()) {LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)+ " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + " too many."+ " the hard limit size is " + StringUtils.byteDesc(hardLimitSize) + ", failed to put cacheKey:"+ cacheKey + " into LruBlockCache.");}if (!evictionInProgress) {// 當回收沒在執(zhí)行時,執(zhí)行回收并返回runEviction();}return;}// 利用cacheKey、數(shù)據(jù)buf等構造Lru緩存數(shù)據(jù)塊實例cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);long newSize = updateSizeMetrics(cb, false);// 放置入map緩存中map.put(cacheKey, cb);// 元素個數(shù)原子性增1long val = elements.incrementAndGet();if (LOG.isTraceEnabled()) {long size = map.size();assertCounterSanity(size, val);}// 如果新大小超過當前可以接受的大小,且未執(zhí)行回收過程中if (newSize > currentAcceptableSize && !evictionInProgress) {runEviction();// 執(zhí)行內(nèi)存回收}}四、淘汰緩存實現(xiàn)分析

? ? ? 淘汰緩存的實現(xiàn)方式有兩種:

? ? ? 1、第一種是在主線程中執(zhí)行緩存淘汰;

? ? ? 2、第二種是在一個專門的淘汰線程中通過持有對外部類LruBlockCache的弱引用WeakReference來執(zhí)行緩存淘汰。

? ? ? 應用那種方式,取決于構造函數(shù)中的boolean參數(shù)evictionThread,如下:

if(evictionThread) {this.evictionThread = new EvictionThread(this);this.evictionThread.start(); // FindBugs SC_START_IN_CTOR} else {this.evictionThread = null;}? ? ? 而在執(zhí)行淘汰緩存的runEviction()方法中,有如下判斷:

/*** Multi-threaded call to run the eviction process.* 多線程調(diào)用以執(zhí)行回收過程*/private void runEviction() {if(evictionThread == null) {// 如果未指定回收線程evict();} else {// 如果執(zhí)行了回收線程evictionThread.evict();}}? ? ? ? 而EvictionThread的evict()實現(xiàn)如下:

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",justification="This is what we want")public void evict() {synchronized(this) {this.notifyAll();}}? ? ? ? 通過synchronized獲取EvictionThread線程的對象鎖,然后主線程通過回收線程對象的notifyAll喚醒EvictionThread線程,那么這個線程是何時wait的呢?答案就在其run()方法中,notifyAll()之后,線程run()方法得以繼續(xù)執(zhí)行:

@Overridepublic void run() {enteringRun = true;while (this.go) {synchronized(this) {try {this.wait(1000 * 10/*Don't wait for ever*/);} catch(InterruptedException e) {LOG.warn("Interrupted eviction thread ", e);Thread.currentThread().interrupt();}}LruBlockCache cache = this.cache.get();if (cache == null) break;cache.evict();}}? ? ? ? 線程會wait10s,放棄對象鎖,在notifyAll()后,繼續(xù)執(zhí)行后面的淘汰流程,即:

/*** Eviction method.*/void evict() {// Ensure only one eviction at a time// 通過可重入互斥鎖ReentrantLock確保同一時刻只有一個回收在執(zhí)行if(!evictionLock.tryLock()) return;try {// 標志位,是否正在進行回收過程evictionInProgress = true;// 當前緩存大小long currentSize = this.size.get();// 計算應該釋放的緩沖大小bytesToFreelong bytesToFree = currentSize - minSize();if (LOG.isTraceEnabled()) {LOG.trace("Block cache LRU eviction started; Attempting to free " +StringUtils.byteDesc(bytesToFree) + " of total=" +StringUtils.byteDesc(currentSize));}// 如果需要回收的大小小于等于0,直接返回if(bytesToFree <= 0) return;// Instantiate priority buckets// 實例化優(yōu)先級隊列:single、multi、memoryBlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,singleSize());BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,multiSize());BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,memorySize());// Scan entire map putting into appropriate buckets// 掃描緩存,分別加入上述三個優(yōu)先級隊列for(LruCachedBlock cachedBlock : map.values()) {switch(cachedBlock.getPriority()) {case SINGLE: {bucketSingle.add(cachedBlock);break;}case MULTI: {bucketMulti.add(cachedBlock);break;}case MEMORY: {bucketMemory.add(cachedBlock);break;}}}long bytesFreed = 0;if (forceInMemory || memoryFactor > 0.999f) {// 如果memoryFactor或者InMemory緩存超過99.9%,long s = bucketSingle.totalSize();long m = bucketMulti.totalSize();if (bytesToFree > (s + m)) {// 如果需要回收的緩存超過則全部回收Single、Multi中的緩存大小和,則全部回收Single、Multi中的緩存,剩余的則從InMemory中回收// this means we need to evict blocks in memory bucket to make room,// so the single and multi buckets will be emptiedbytesFreed = bucketSingle.free(s);bytesFreed += bucketMulti.free(m);if (LOG.isTraceEnabled()) {LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +" from single and multi buckets");}// 剩余的則從InMemory中回收bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);if (LOG.isTraceEnabled()) {LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +" total from all three buckets ");}} else {// 否則,不需要從InMemory中回收,按照如下策略回收Single、Multi中的緩存:嘗試讓single-bucket和multi-bucket的比例為1:2// this means no need to evict block in memory bucket,// and we try best to make the ratio between single-bucket and// multi-bucket is 1:2long bytesRemain = s + m - bytesToFree;if (3 * s <= bytesRemain) {// single-bucket足夠小,從multi-bucket中回收// single-bucket is small enough that no eviction happens for it// hence all eviction goes from multi-bucketbytesFreed = bucketMulti.free(bytesToFree);} else if (3 * m <= 2 * bytesRemain) {// multi-bucket足夠下,從single-bucket中回收// multi-bucket is small enough that no eviction happens for it// hence all eviction goes from single-bucketbytesFreed = bucketSingle.free(bytesToFree);} else {// single-bucket和multi-bucket中都回收,且盡量滿足回收后比例為1:2// both buckets need to evict some blocksbytesFreed = bucketSingle.free(s - bytesRemain / 3);if (bytesFreed < bytesToFree) {bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);}}}} else {// 否則,從三個隊列中循環(huán)回收PriorityQueue<BlockBucket> bucketQueue =new PriorityQueue<BlockBucket>(3);bucketQueue.add(bucketSingle);bucketQueue.add(bucketMulti);bucketQueue.add(bucketMemory);int remainingBuckets = 3;BlockBucket bucket;while((bucket = bucketQueue.poll()) != null) {long overflow = bucket.overflow();if(overflow > 0) {long bucketBytesToFree = Math.min(overflow,(bytesToFree - bytesFreed) / remainingBuckets);bytesFreed += bucket.free(bucketBytesToFree);}remainingBuckets--;}}if (LOG.isTraceEnabled()) {long single = bucketSingle.totalSize();long multi = bucketMulti.totalSize();long memory = bucketMemory.totalSize();LOG.trace("Block cache LRU eviction completed; " +"freed=" + StringUtils.byteDesc(bytesFreed) + ", " +"total=" + StringUtils.byteDesc(this.size.get()) + ", " +"single=" + StringUtils.byteDesc(single) + ", " +"multi=" + StringUtils.byteDesc(multi) + ", " +"memory=" + StringUtils.byteDesc(memory));}} finally {// 重置標志位,釋放鎖等stats.evict();evictionInProgress = false;evictionLock.unlock();}}

? ? ? ? 邏輯比較清晰,如下:

? ? ? ? 1、通過可重入互斥鎖ReentrantLock確保同一時刻只有一個回收在執(zhí)行;

? ? ? ? 2、設置標志位evictionInProgress,是否正在進行回收過程為true;

? ? ? ? 3、獲取當前緩存大小currentSize;

? ? ? ? 4、計算應該釋放的緩沖大小bytesToFree:currentSize - minSize();

? ? ? ? 5、如果需要回收的大小小于等于0,直接返回;

? ? ? ? 6、實例化優(yōu)先級隊列:single、multi、memory;

? ? ? ? 7、掃描緩存,分別加入上述三個優(yōu)先級隊列;

? ? ? ? 8、如果forceInMemory或者InMemory緩存超過99.9%:

? ? ? ? ? ? ? 8.1、如果需要回收的緩存超過則全部回收Single、Multi中的緩存大小和,則全部回收Single、Multi中的緩存,剩余的則從InMemory中回收(this means we need to evict blocks in memory bucket to make room,so the single and multi buckets will be emptied):

? ? ? ? ? ? ? 8.2、否則,不需要從InMemory中回收,按照如下策略回收Single、Multi中的緩存:嘗試讓single-bucket和multi-bucket的比例為1:2:

? ? ? ? ? ? ? ? ? ? ? ? 8.2.1、?single-bucket足夠小,從multi-bucket中回收;

? ? ? ? ? ? ? ? ? ? ? ? 8.2.2、?multi-bucket足夠小,從single-bucket中回收;

? ? ? ? ? ? ? ? ? ? ? ? 8.2.3、single-bucket和multi-bucket中都回收,且盡量滿足回收后比例為1:2;

? ? ? ? 9、否則,從三個隊列中循環(huán)回收;

? ? ? ? 10、最后,重置標志位,釋放鎖等。

四、實例化

? ? ? ? 參見《HBase-1.2.4 Allow block cache to be external分析》最后。







總結

以上是生活随笔為你收集整理的HBase-1.2.4LruBlockCache实现分析(一)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。