日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

LevelDB 源码剖析(九)DBImpl模块:Open、Get、Put、Delete、Write

發布時間:2024/4/11 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 LevelDB 源码剖析(九)DBImpl模块:Open、Get、Put、Delete、Write 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • Open
  • Get
  • Put、Delete、Write

Open

數據庫 Open 操作主要用于創建新的 LevelDB 數據庫或打開一個已存在的數據庫。Open 操作的主要函數共需傳遞 3 個參數:兩個輸入參數 options 與 dbname,一個輸出參數 dbptr。

首先我們來看看它的代碼:

// https://github.com/google/leveldb/blob/master/db/db_impl.ccStatus DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {*dbptr = nullptr;//初始化dbimplDBImpl* impl = new DBImpl(options, dbname);impl->mutex_.Lock();VersionEdit edit;//嘗試恢復之前已經存在的數據庫文件中的數據bool save_manifest = false;Status s = impl->Recover(&edit, &save_manifest);//判斷Memtable是否為空if (s.ok() && impl->mem_ == nullptr) {//創建新的Log和MemTableuint64_t new_log_number = impl->versions_->NewFileNumber();WritableFile* lfile;s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),&lfile);if (s.ok()) {edit.SetLogNumber(new_log_number);impl->logfile_ = lfile;impl->logfile_number_ = new_log_number;impl->log_ = new log::Writer(lfile);impl->mem_ = new MemTable(impl->internal_comparator_);impl->mem_->Ref();}}//判斷是否需要保存Manifest文件if (s.ok() && save_manifest) {edit.SetPrevLogNumber(0); edit.SetLogNumber(impl->logfile_number_);//生成新的版本s = impl->versions_->LogAndApply(&edit, &impl->mutex_);}if (s.ok()) {//請理無用的文件impl->RemoveObsoleteFiles();//嘗試進行Compactionimpl->MaybeScheduleCompaction();}impl->mutex_.Unlock();if (s.ok()) {assert(impl->mem_ != nullptr);*dbptr = impl;} else {delete impl;}return s; }

具體的實現流程如下圖所示:

Open執行流程


  • 初始化一個 DBImpl 的對象 impl,將相關的參數選項 options 與數據庫名稱 dbname 作為構造函數的參數。
  • 調用 DBImpl 對象的 Recover 函數,嘗試恢復之前存在的數據庫文件數據。
  • 進行 Recover 操作后,判斷 impl 對象中的 MemTable 對象指針 mem_ 是否為空,如果為空,則進入第 4 步,不為空則進入第 5 步。
  • 創建新的 Log 文件以及對應的 MemTable 對象。這一步主要分別實例化 log::Writer 和 MemTable 兩個對象,并賦值給 impl 中對應的成員變量,后續通過 impl 中的成員變量操作 Log 文件和 MemTable。
  • 判斷是否需要保存 Manifest 相關信息,如果需要,則保存相關信息。
  • 判斷前面步驟是否都成功了,如果成功,則調用 DeleteObsoleteFiles 函數對一些過時文件進行刪除,且調用 MaybeScheduleCompaction 函數嘗試進行數據文件的 Compaction 操作。
  • Get

    Get 主要用于從 LevelDB 中獲取對應的鍵-值對數據,它是單個數據讀取的主要接口。Get 的主要參數為數據讀參數選項 options、鍵 key,以及一個用于返回數據值的 string 類型指針 value。其代碼如下:

    // https://github.com/google/leveldb/blob/master/db/db_impl.ccStatus DBImpl::Get(const ReadOptions& options, const Slice& key,std::string* value) {Status s;MutexLock l(&mutex_);SequenceNumber snapshot;//獲取序列號并賦值給snapshotif (options.snapshot != nullptr) {snapshot =static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();} else {snapshot = versions_->LastSequence();}MemTable* mem = mem_;MemTable* imm = imm_;Version* current = versions_->current();mem->Ref();if (imm != nullptr) imm->Ref();current->Ref();bool have_stat_update = false;Version::GetStats stats;{mutex_.Unlock();//首先查找memtableLookupKey lkey(key, snapshot);if (mem->Get(lkey, value, &s)) {//如果查找不到,接著查找immutable} else if (imm != nullptr && imm->Get(lkey, value, &s)) {//如果還是沒找到,則繼續查找SSTable} else {s = current->Get(options, lkey, value, &stats);have_stat_update = true;}mutex_.Lock();}if (have_stat_update && current->UpdateStats(stats)) {MaybeScheduleCompaction();}mem->Unref();if (imm != nullptr) imm->Unref();current->Unref();return s; }

    具體的實現流程如下圖所示:

    Get執行流程


    Get 在查詢讀取數據時,依次從 MemTable、Immutable MemTable 以及當前保存的 SSTable 文件中進行查找。如果在 MemTabel 中找到,立即返回對應的數值,如果沒有找到,再從 Immutable MemTable 中查找。而如果Immutable MemTable 中還是沒有找到,則會從持久化的文件 SSTable 中查找,直到找出該鍵對應的數值為止。

    SequenceNumber 有什么用呢?

    其主要作用是對 DB 的整個存儲空間進行時間刻度上的序列備份,即要從 DB 中獲取某一個數據,不僅需要其對應的鍵 key,而且需要其對應的時間序列號。對數據庫進行寫操作會改變序列號,每進行一次寫操作,則序列號加 1。


    Put、Delete、Write

    Put 主要有3個參數:寫操作參數 opt、操作數據的 key 與操作數據新值 value。其代碼如下:

    // https://github.com/google/leveldb/blob/master/db/db_impl.ccStatus DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {return DB::Put(o, key, val); }Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {WriteBatch batch;batch.Put(key, value);return Write(opt, &batch); }

    從上面的代碼可以看出, Put 其實也是將單條數據的操作變更為一個批量操作,然后調用 Write 進行實現。

    Delete 不會直接刪除數據,而是在對應位置插入一個 key 的刪除標志,然后在后續的 Compaction 過程中才最終去除這條 key-value 記錄。其代碼如下:

    // https://github.com/google/leveldb/blob/master/db/db_impl.ccStatus DBImpl::Delete(const WriteOptions& options, const Slice& key) {return DB::Delete(options, key); }Status DB::Delete(const WriteOptions& opt, const Slice& key) {WriteBatch batch;batch.Delete(key);return Write(opt, &batch); }

    從上面的代碼可以看出 Delete 的本質其實也是一個 Write 操作。

    在介紹 Write 之前,首先介紹其封裝的消息結構 Writer 與任務隊列 writes_。

    Writer 用于保存基本信息,如批量操作 batch、狀態信息 status、是否同步 sync、是否完成 done 以及用于多線程操作的條件變量cv 。

    // https://github.com/google/leveldb/blob/master/db/db_impl.ccstruct DBImpl::Writer {explicit Writer(port::Mutex* mu): batch(nullptr), sync(false), done(false), cv(mu) {}Status status; //狀態WriteBatch* batch; //批量寫入對象bool sync;//表示是否已經同步了bool done;//表示是否已經處理完成port::CondVar cv;//這個是條件變量 };

    接著看看任務隊列 writers_,該隊列對象中的元素節點為 Writer 對象指針。可見 writes_ 與寫操作的緩存空間有關,批量操作請求均存儲在這個隊列中,按順序執行,已完成的出隊,而未執行的則在這個隊列中處于等待狀態。

    std::deque<Writer*> writers_ GUARDED_BY(mutex_);

    writers_隊列示意圖


    Write 主要有兩個參數:WriteOptions 對象與 WriteBatch 對象。WriteOptions 主要包含一些關于寫操作的參數選項,而WriteBatch對象,相當于一個緩沖區,用于定義、保存一系列的批量操作。其代碼實現如下:

    // https://github.com/google/leveldb/blob/master/db/db_impl.ccStatus DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {//實例化一個Writer對象b并插入writers_隊列中等待執行Writer w(&mutex_);w.batch = updates;w.sync = options.sync;w.done = false;MutexLock l(&mutex_);writers_.push_back(&w);while (!w.done && &w != writers_.front()) {w.cv.Wait();}if (w.done) {return w.status;}Status status = MakeRoomForWrite(updates == nullptr);uint64_t last_sequence = versions_->LastSequence();Writer* last_writer = &w;if (status.ok() && updates != nullptr) { //合并寫入操作WriteBatch* write_batch = BuildBatchGroup(&last_writer);WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);last_sequence += WriteBatchInternal::Count(write_batch);{mutex_.Unlock();//將更新寫入日志文件中,并且將日志文件寫入磁盤中status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));bool sync_error = false;if (status.ok() && options.sync) {status = logfile_->Sync();if (!status.ok()) {sync_error = true;}}//將更新寫入Memtable中if (status.ok()) {status = WriteBatchInternal::InsertInto(write_batch, mem_);}mutex_.Lock();if (sync_error) {RecordBackgroundError(status);}}if (write_batch == tmp_batch_) tmp_batch_->Clear();versions_->SetLastSequence(last_sequence);}//由于和并寫入操作一次可能會處理多個writer_隊列中的元素,因此將所有已經處理的元素狀態進行變更,并且發送signal信號while (true) {Writer* ready = writers_.front();writers_.pop_front();if (ready != &w) {ready->status = status;ready->done = true;ready->cv.Signal();}if (ready == last_writer) break;}//通知writers_隊列中的第一個元素,發送signal信號if (!writers_.empty()) {writers_.front()->cv.Signal();}return status; }

    具體的實現流程如下圖所示:

    Write執行流程


    • 實例化一個 Writer 對象,并將其插入所示的 writers_ 隊列中。

    • 通過 Writer 中的條件變量 cv 調用 wait 方法將該線程掛起,等待其他線程發送 signal 信號,并且等待隊列前面的 Writer 操作全部執行完畢:

      • 如果線程收到了 signal 信號:則解除阻塞。
      • 如果線程沒有收到了 signal 信號:說明隊列前面仍有其他的 Writer 操作,那么該線程會再次調用 wait 方法實現阻塞,從而保證了 Writer 操作按照隊列生成次序執行。
    • 當輪到本線程操作時,首先通過 MakeRoomForWrite 函數進行內存空間分配。

    • 當獲取到需要的內存后,根據一系列的批量操作,對 Log 文件以及 MemTable 分別進行更新。

    • 依據批量操作的數目更新 SequenceNumber。

    • 通過 Writer 中的條件變量 cv 發送 signal 信號,以通知處于等待狀態的其他線程開始執行。

    總結

    以上是生活随笔為你收集整理的LevelDB 源码剖析(九)DBImpl模块:Open、Get、Put、Delete、Write的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。