日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

Rocksdb 通过ingestfile 来支持高效的离线数据导入

發(fā)布時(shí)間:2023/11/27 生活经验 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Rocksdb 通过ingestfile 来支持高效的离线数据导入 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

    • 前言
    • 使用方式
    • 實(shí)現(xiàn)原理
    • 總結(jié)

前言

很多時(shí)候,我們使用數(shù)據(jù)庫(kù)時(shí)會(huì)有離線向數(shù)據(jù)庫(kù)導(dǎo)入數(shù)據(jù)的需求。比如大量用戶在本地的一些離線數(shù)據(jù),想要將這一些數(shù)據(jù)導(dǎo)入到已有的數(shù)據(jù)庫(kù)中;或者說NewSQL場(chǎng)景中部分機(jī)器離線,重新上線之后的數(shù)據(jù)增量/全量同步 等場(chǎng)景。這個(gè)時(shí)候 我們并不想要讓這一些數(shù)據(jù)占用過多的系統(tǒng)資源,更不希望他們對(duì)正常的線上業(yè)務(wù)有影響,所以盡可能高效得完成這一些數(shù)據(jù)的同步就需要深入設(shè)計(jì)一番。

而如果底層引擎使用的是rocksdb,那就非常省事了,只需要組織好你們的數(shù)據(jù)調(diào)用接口就完事了,剩下的導(dǎo)入過程由引擎完成。 tikv便是通過 rocksdb的這個(gè)功能完成集群異常恢復(fù)之后 region之間的全量增量同步的。回到今天我們要討論的主題,便是rocksdb的這個(gè)數(shù)據(jù)導(dǎo)入過程是如何盡可能快、盡可能高效得完成的。

使用方式

講解實(shí)現(xiàn)原理之前我們先看看如何使用這個(gè)功能,功能的易用性也很重要,用戶還是希望盡可能得少寫代碼來完成這個(gè)工作。使用上主要是兩部分:創(chuàng)建SST文件 和 導(dǎo)入SST文件。

  • 創(chuàng)建sst文件:這一步主要是通過一個(gè)sst_filter_writer,將需要導(dǎo)入的 k/v 數(shù)據(jù)轉(zhuǎn)換成sst文件

    需要注意的是:

    1. 用戶k/v 數(shù)據(jù)需要按照options.comparator 嚴(yán)格有序,默認(rèn)是按照key的字典序
    2. 這里的options 建議和db寫入的options用一套(壓縮選項(xiàng),sst文件相關(guān)選項(xiàng)等)
    Options options;SstFileWriter sst_file_writer(EnvOptions(), options);
    // 指定形成的sst文件的路徑
    std::string file_path = "/home/usr/file1.sst";// open file_path
    Status s = sst_file_writer.Open(file_path);
    for (...) {// 寫入sst,用戶保證k/v 的順序s = sst_file_writer.Put(key, value);if (!s.ok()) {printf("Error while adding Key: %s, Error: %s\n", key.c_str(),s.ToString().c_str());return 1;}
    }// 完成寫入
    s = sst_file_writer.Finish();
    
  • 導(dǎo)入sst文件:這個(gè)步驟就是將創(chuàng)建好的一個(gè)或者多個(gè)sst文件導(dǎo)入到db中,也允許向多個(gè)cf中導(dǎo)入

IngestExternalFileOptions ifo;
// Ingest the 2 passed SST files into the DB
// 導(dǎo)入數(shù)據(jù)
Status s = db_->IngestExternalFile({"/home/usr/file1.sst", "/home/usr/file2.sst"}, ifo);

使用還是比較簡(jiǎn)單的,整體的使用過程如下:

#include <iostream>
#include <vector>#include <gflags/gflags.h>#include <rocksdb/db.h>
#include <rocksdb/env.h>
#include <rocksdb/sst_file_writer.h>#define DATA_SIZE 10
#define VALUE_SIZE 1024using namespace std;// 比較函數(shù)
bool cmp(pair<string, string> str1,pair<string, string> str2) {if(str1.first < str2.first) {return true;} else if (str1.first == str2.first && str1.second < str2.second) {return true;} else {return false;}
}// 隨機(jī)字符串
static string rand_data(long data_range) {char buff[30];unsigned long long num = 1;for (int i = 0;i < 4; ++i) {num *= (unsigned long long )rand();}sprintf(buff, "%llu", num % (unsigned long long)data_range );string data(buff);return data;
}// 構(gòu)造有序數(shù)據(jù)
void construct_data(vector<pair<string,string>> &input) {int i;string key;string value;for (i = 0;i < DATA_SIZE; i++) {if(key == "0") {continue;}key = rand_data(VALUE_SIZE);value = rand_data(VALUE_SIZE);input.push_back(make_pair(key, value));}
}void traverse_data(vector<pair<string,string>> input) {int i;for(auto data : input) {cout << data.first << " " << data.second << endl;}
}// 創(chuàng)建sst文件
int create_sst(string file_path) {vector<pair<string,string>> input;vector<pair<string,string>>::iterator input_itr;rocksdb::Options option;/* open statistics and disable compression */option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::SstFileWriter sst_file_writer(rocksdb::EnvOptions(), option);rocksdb::Status s = sst_file_writer.Open(file_path);if (!s.ok()) {printf("Error while opening file %s, Error: %s\n", file_path.c_str(),s.ToString().c_str());return 1;}// 需要保證數(shù)據(jù)有序后再寫入construct_data(input);sort(input.begin(), input.end(), cmp);traverse_data(input);// Insert rows into the SST file, note that inserted keys must be // strictly increasing (based on options.comparator)for (input_itr = input.begin(); input_itr != input.end();input_itr ++) {rocksdb::Slice key(input_itr->first);rocksdb::Slice value(input_itr->second);s = sst_file_writer.Put(key, value);if (!s.ok()) {printf("Error while adding Key: %s, Error: %s\n",key.ToString().c_str(),s.ToString().c_str());return 1;}}// Close the files = sst_file_writer.Finish();if (!s.ok()) {printf("Error while finishing file %s, Error: %s\n", file_path.c_str(),s.ToString().c_str());return 1;}return 0;
}static rocksdb::DB *db;void create_db() {rocksdb::Options option;/* open statistics and disable compression */option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::Status s = rocksdb::DB::Open( option,"./db", &db);if (!s.ok()) {printf("Open db failed : %s\n", s.ToString().c_str());return;}
}void db_write(int num_keys) {rocksdb::WriteOptions write_option;write_option.sync = true;rocksdb::Slice key;rocksdb::Slice value;rocksdb::Status s;int i;printf("begin write \n");for (i = 0;i < num_keys; i++) {key = rand_data(VALUE_SIZE);value = rand_data(VALUE_SIZE);s = db->Put(write_option, key, value);if (!s.ok()) {printf("Put db failed : %s\n", s.ToString().c_str());return;}}db->Flush(rocksdb::FlushOptions());printf("finish write \n");
}int main() {// 先寫入一批數(shù)據(jù)create_db();db_write(100000);// 創(chuàng)建sst文件if (create_sst("./test.sst") == 0) {printf("creates sst success !\n");} else {printf("creates sst failed !\n");}// 導(dǎo)入數(shù)據(jù)rocksdb::IngestExternalFileOptions ifo;// Ingest the 2 passed SST files into the DBprintf("Ingest sst !\n");rocksdb::Status s = db->IngestExternalFile({"test.sst"}, ifo);if (!s.ok()) {printf("Error while adding file test.sst , Error %s\n",s.ToString().c_str());return 1;}return 0;
}

運(yùn)行輸出如下:

begin write 
finish write
# consturct data,需按照字典序,如果沒有按照字典序構(gòu)造的話會(huì)報(bào)錯(cuò)
1008 232
240 880
288 63
410 768
506 56
534 256
640 180
72 248
800 672
944 217
creates sst success !

通過db日志可以看到我們創(chuàng)建的sst文件test.sst被成功導(dǎo)入到db,形成了./db/000020.sst,且在db目錄中。

╰─$ cat db/LOG |grep ingested
[AddFile] External SST file test.sst was ingested in L0 with path ./db/000020.sst (global_seqno=200012)╰─$ ls db
000017.log               000020.sst               IDENTITY                 LOG                      LOG.old.1618643738564935 OPTIONS-000008
000019.sst               CURRENT                  LOCK                     LOG.old.1618123487361092 MANIFEST-000013          OPTIONS-000016

實(shí)現(xiàn)原理

從如何使用這個(gè)功能上我們能夠感覺到這一些數(shù)據(jù)并不是通過rocksdb正常的I/O流程寫入的。如果使用正常的接口,那我們用戶不需要排序,而是直接通過db->Put接口將k/v寫入,凡事都有但是,但是這樣來導(dǎo)入離線數(shù)據(jù)在rocksdb內(nèi)部后續(xù)的flush/compaction 都會(huì)消耗大量的系統(tǒng)資源,而這并不是我們想要的高效。所以,rocksdb提供的ingest接口肯定不會(huì)讓這一些要導(dǎo)入的數(shù)據(jù)消耗過多的資源,接下來我們一起看看底層的詳細(xì)實(shí)現(xiàn)。

為了更形象得告訴大家在rocksdb作為存儲(chǔ)引擎的場(chǎng)景,如果通過傳統(tǒng)的put接口導(dǎo)入數(shù)據(jù)會(huì)多出哪一些I/O,如下圖

其中紅色的尖頭 是ingest file 相比于傳統(tǒng)的put接口 少的I/O部分,可以說ingest方式導(dǎo)入數(shù)據(jù)極大得節(jié)約了整個(gè)系統(tǒng)資源的開銷(包括但不限于I/O , CPU 資源的開銷)。

下面主要介紹的是有了sst文件,接下來如何導(dǎo)入到db中的過程。關(guān)于通過sst_file_writer創(chuàng)建具體的sst文件的過程就不多說了,也就是按照sst文件的格式(datablock,index block…footer)等將有序的數(shù)據(jù)一個(gè)個(gè)添加進(jìn)去而已。

主要有如下幾步:

  1. 為待插入的sst文件創(chuàng)建file link到db目錄,或者直接拷貝進(jìn)去
  2. 停止寫入,需要保證即將導(dǎo)入的sst文件在db中擁有一個(gè)安全合理的seqno,如果持續(xù)寫入,那這個(gè)seqno可能不會(huì)全局遞增了。
  3. 檢查導(dǎo)入的sst文件是否和memtable中的key-range有重疊,有的話需要flush memtable
  4. 為這個(gè)sst文件 按照其key-range挑選一個(gè)合適的level放進(jìn)去
  5. 為這個(gè)問天添加一個(gè)全局的seqno
  6. 恢復(fù)db的寫入

其中停止寫入到恢復(fù)寫入這段時(shí)間對(duì)于用戶來說越小越好,所以ingest的性能很重要。

接下來看看詳細(xì)的源代碼實(shí)現(xiàn):

導(dǎo)入數(shù)據(jù)的函數(shù)入口是DBImpl::IngestExternalFiles

導(dǎo)入的sst文件最后都需要形成一個(gè)db內(nèi)部的sst文件,因?yàn)檫@個(gè)時(shí)候已經(jīng)停止寫入了,所以會(huì)從最新的sst文件編號(hào)之后取一個(gè)文件編號(hào),后續(xù)的其他要導(dǎo)入的sst文件會(huì)不斷追加。

Status DBImpl::IngestExternalFiles(const std::vector<IngestExternalFileArg>& args) {...// 構(gòu)造文件編號(hào)到next_file_number中Status status = ReserveFileNumbersBeforeIngestion(static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,pending_output_elem, &next_file_number);if (!status.ok()) {InstrumentedMutexLock l(&mutex_);ReleaseFileNumberFromPendingOutputs(pending_output_elem);return status;}...
}

有了在db內(nèi)部的合法文件編號(hào),我們就可以進(jìn)行文件遷移了,將待導(dǎo)入的sst文件遷移到db內(nèi)部已經(jīng)構(gòu)造好的sst文件編號(hào)之中。

會(huì)為每一個(gè)cf構(gòu)造一個(gè)ingest_job, 將待導(dǎo)入文件拷貝/移動(dòng)到 db內(nèi)部的sst文件中,這個(gè)過程是在接下來的Prepare函數(shù)中。

  uint64_t start_file_number = next_file_number;for (size_t i = 1; i != num_cfs; ++i) {start_file_number += args[i - 1].external_files.size();auto* cfd =static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);// prepare 函數(shù)exec_results[i].second = ingestion_jobs[i].Prepare(args[i].external_files, start_file_number, super_version);exec_results[i].first = true;CleanupSuperVersion(super_version);}

看看Prepare的函數(shù)實(shí)現(xiàn):

  1. 拿著輸入的多個(gè)sst文件,如果有多個(gè),則需要檢查這一些文件之間是否有重疊key,有的話就不支持了(rocksdb除了l0,其他層不允許有重疊key)。
  2. 根據(jù)用戶指定的ingest option: move_files 是否為true,來將待導(dǎo)入文件move到db中, 如果move失敗了就拷貝文件。
Status ExternalSstFileIngestionJob::Prepare(const std::vector<std::string>& external_files_paths,uint64_t next_file_number, SuperVersion* sv) {// 解析文件信息for (const std::string& file_path : external_files_paths) {IngestedFileInfo file_to_ingest;status = GetIngestedFileInfo(file_path, &file_to_ingest, sv);if (!status.ok()) {return status;}files_to_ingest_.push_back(file_to_ingest);}// 確保導(dǎo)入的多個(gè)sst文件之間沒有重疊......} else if (num_files > 1) {// Verify that passed files dont have overlapping rangesautovector<const IngestedFileInfo*> sorted_files;for (size_t i = 0; i < num_files; i++) {sorted_files.push_back(&files_to_ingest_[i]);}std::sort(sorted_files.begin(), sorted_files.end(),[&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {return sstableKeyCompare(ucmp, info1->smallest_internal_key,info2->smallest_internal_key) < 0;});// 如果有重疊的話,ingest也無法支持,因?yàn)樵赿b中大于level0的更高層level內(nèi)部的// sst文件之間是不允許有重疊的,加速更高層的二分查找。for (size_t i = 0; i < num_files - 1; i++) {if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,sorted_files[i + 1]->smallest_internal_key) >= 0) {files_overlap_ = true;break;}}}......// 根據(jù)用戶參數(shù)move文件if (ingestion_options_.move_files) {status = env_->LinkFile(path_outside_db, path_inside_db);...} else { // 否則就拷貝文件f.copy_file = true;}if (f.copy_file) {TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",nullptr);// CopyFile also sync the new file.status = CopyFile(env_, path_outside_db, path_inside_db, 0,db_options_.use_fsync);}...
}

到此,文件就已經(jīng)進(jìn)入到了rocksdb 之中,ingest_job的prepare流程就結(jié)束了。

接下來 就到了我們前面介紹總步驟的第二步,停止用戶對(duì)當(dāng)前db的寫入:

DBImpl::IngestExternalFilesWriteThread::EnterUnbatched

其中WriteThread::EnterUnbatched函數(shù)會(huì)讓當(dāng)前db的寫入線程都處于wait狀態(tài)。

接下來就是檢查當(dāng)前要導(dǎo)入的文件是否和memtable中的key-range有重疊,函數(shù)調(diào)用如下:

DBImpl::IngestExternalFilesExternalSstFileIngestionJob::NeedsFlushColumnFamilyData::RangesOverlapWithMemtables

這個(gè)函數(shù)ColumnFamilyData::RangesOverlapWithMemtables會(huì)拿著從ingest files中構(gòu)造好的key-range和memtable中的 key-range 進(jìn)行對(duì)比,如果有重疊key,則會(huì)將memtable flush置為true

Status ColumnFamilyData::RangesOverlapWithMemtables(const autovector<Range>& ranges, SuperVersion* super_version,bool* overlap) {...Status status;// 拿著ingest files的range中的每一個(gè)key,看是否能夠從memtable中找到for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {auto* vstorage = super_version->current->storage_info();auto* ucmp = vstorage->InternalComparator()->user_comparator();InternalKey range_start(ranges[i].start, kMaxSequenceNumber,kValueTypeForSeek);// 從memtable中找memtable_iter->Seek(range_start.Encode());status = memtable_iter->status();ParsedInternalKey seek_result;if (status.ok()) {if (memtable_iter->Valid() &&!ParseInternalKey(memtable_iter->key(), &seek_result)) {status = Status::Corruption("DB have corrupted keys");}}// 找到了,則置overlap為trueif (status.ok()) {if (memtable_iter->Valid() &&ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {*overlap = true;} else if (range_del_agg.IsRangeOverlapped(ranges[i].start,ranges[i].limit)) {*overlap = true;}}}...
}

在后續(xù)的DBImpl::FlushMemTable函數(shù)中會(huì)flush memtable,不同的cf是分開進(jìn)行的

DBImpl::IngestExternalFilesDBImpl::FlushMemTable

接下來就開始了第四步和第五步的處理邏輯,需要為每一個(gè)落到db中的sst文件挑選合適的level以及分配全局seqno,處理邏輯在Run函數(shù)中:

DBImpl::IngestExternalFilesExternalSstFileIngestionJob::Run

主要處理邏輯如下:

一個(gè)一個(gè)ingest file進(jìn)行處理

  1. 選擇一個(gè)合適的level,將ingest file插入進(jìn)去
    如果user配置了allow_ingest_behind=true,即允許導(dǎo)入的數(shù)據(jù)直接插入到最后一層的文件位置,且ingest的時(shí)候配置的ingest option中ingest_behind=true,則會(huì)先嘗試插入到bottomest level,如果最后一層的文件和待插入的文件有重疊,則插入失敗。處理邏輯在CheckLevelForIngestedBehindFile函數(shù)之中。

    否則逐層遍歷,找到第一個(gè)和這一些key-range有重疊的level即可。函數(shù)AssignLevelAndSeqnoForIngestedFile

  2. 找到了合適的level的同時(shí)會(huì)記錄一個(gè)assigned_seqno,是在當(dāng)前last_sequence的基礎(chǔ)上+1得到的。函數(shù)AssignLevelAndSeqnoForIngestedFile之中。

  3. 為當(dāng)前ingest_file 寫入一個(gè)global seq no, 并執(zhí)行fsync/sync。函數(shù)AssignGlobalSeqnoForIngestedFile之中。

  4. 最后就是將當(dāng)完成更新的ingest file的元信息更新到VersionEdit之中。

接下來就進(jìn)入尾聲了:

  1. 將更新的VersionEdit寫入到MANIFEST文件之中
  2. 更新每個(gè)ingest file對(duì)應(yīng)的cf信息,并且調(diào)度compaction/flush, 因?yàn)橹癷ngest file時(shí)找的是有重疊key的一層。
  3. 恢復(fù)db的寫入
     	// 將`VersionEdit`寫入到MANIFEST文件之中status =versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,edit_lists, &mutex_, directories_.GetDbDir());}if (status.ok()) {for (size_t i = 0; i != num_cfs; ++i) {auto* cfd =static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();if (!cfd->IsDropped()) {//更新每個(gè)ingest file對(duì)應(yīng)的cf信息,并且調(diào)度compaction/flush, 因?yàn)橹癷ngest file時(shí)找的是有重疊key的一層InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],*cfd->GetLatestMutableCFOptions());...}}}// 恢復(fù)db的寫入,喚醒db的其他所有的writerwrite_thread_.ExitUnbatched(&w);

到此,整個(gè)ingest就算是結(jié)束了。

總結(jié)

通過ingest的實(shí)現(xiàn),我們能夠看到rocksdb通過ingest的方式支持離線數(shù)據(jù)導(dǎo)入確實(shí)能夠極大得降低系統(tǒng)資源的開銷。不需要一個(gè)key在LSM中被反復(fù)的寫入、讀取。

總結(jié)

以上是生活随笔為你收集整理的Rocksdb 通过ingestfile 来支持高效的离线数据导入的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。