日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

influxdb数据过期_Influxdb Cluster下的数据写入

發(fā)布時間:2023/12/15 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 influxdb数据过期_Influxdb Cluster下的数据写入 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Cluster下的數(shù)據(jù)寫入

數(shù)據(jù)寫入的實現(xiàn)主要分析cluster/points_writer.go中的WritePoints函數(shù)的實現(xiàn)//?WritePoints?writes?across?multiple?local?and?remote?data?nodes?according?the?consistency?level.func?(w?*PointsWriter)?WritePoints(p?*WritePointsRequest)?error?{

w.statMap.Add(statWriteReq,?1)

w.statMap.Add(statPointWriteReq,?int64(len(p.Points)))????//2.1?先獲取RetentionPolicy

if?p.RetentionPolicy?==?""?{

db,?err?:=?w.MetaClient.Database(p.Database)????????if?err?!=?nil?{????????????return?err

}?else?if?db?==?nil?{????????????return?influxdb.ErrDatabaseNotFound(p.Database)

}

p.RetentionPolicy?=?db.DefaultRetentionPolicy

}????//?2.2?生成?shardMap

shardMappings,?err?:=?w.MapShards(p)????if?err?!=?nil?{????????return?err

}????//?Write?each?shard?in?it's?own?goroutine?and?return?as?soon

//?as?one?fails.

ch?:=?make(chan?error,?len(shardMappings.Points))????for?shardID,?points?:=?range?shardMappings.Points?{

//?2.3?寫入數(shù)據(jù)到Shard

go?func(shard?*meta.ShardInfo,?database,?retentionPolicy?string,?points?[]models.Point)?{

ch?

}(shardMappings.Shards[shardID],?p.Database,?p.RetentionPolicy,?points)

}????//?Send?points?to?subscriptions?if?possible.

ok?:=?false

//?We?need?to?lock?just?in?case?the?channel?is?about?to?be?nil'ed

w.mu.RLock()

select?{????case?w.subPoints?

ok?=?true

default:

}

w.mu.RUnlock()????if?ok?{

w.statMap.Add(statSubWriteOK,?1)

}?else?{

w.statMap.Add(statSubWriteDrop,?1)

}????//?2.4?等待寫入完成

for?range?shardMappings.Points?{

select?{????????case?

}

}

}????return?nil}上面的函數(shù)實現(xiàn)主要分如下幾個步驟

2.1 獲取對應的RetentionPolicy

2.2 生成ShardMap, 將各個point對應到相應ShardGroup中的Shard中, 這步很關鍵

2.3 按ShardId不同,開啟新的goroutine, 將points寫入相應的Shard,可能設計對寫入數(shù)據(jù)到其它的DataNode上;

2.4 等待寫入完成或退出

ShardMap的生成先講一下ShardGroup的概念

1.1 寫入Influxdb的每一條數(shù)據(jù)對帶有相應的time時間,每一個SharGroup都有自己的start和end時間,這個時間跨度是由用戶寫入時選取的RetentionPolicy時的ShardGroupDarution決定,這樣每條寫入的數(shù)據(jù)就必然僅屬于一個確定的ShardGroup中;

主要實現(xiàn)在cluster/points_writer.go中的MapShards中func?(w?*PointsWriter)?MapShards(wp?*WritePointsRequest)?(*ShardMapping,?error)?{????//?holds?the?start?time?ranges?for?required?shard?groups

timeRanges?:=?map[time.Time]*meta.ShardGroupInfo{}

rp,?err?:=?w.MetaClient.RetentionPolicy(wp.Database,?wp.RetentionPolicy)????if?err?!=?nil?{????????return?nil,?err

}????if?rp?==?nil?{????????return?nil,?influxdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy)

}????for?_,?p?:=?range?wp.Points?{

timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]?=?nil

}????//?holds?all?the?shard?groups?and?shards?that?are?required?for?writes

for?t?:=?range?timeRanges?{

sg,?err?:=?w.MetaClient.CreateShardGroup(wp.Database,?wp.RetentionPolicy,?t)????????if?err?!=?nil?{????????????return?nil,?err

}

timeRanges[t]?=?sg

}

mapping?:=?NewShardMapping()????for?_,?p?:=?range?wp.Points?{

sg?:=?timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]

sh?:=?sg.ShardFor(p.HashID())

mapping.MapPoint(&sh,?p)

}????return?mapping,?nil}我們來拆解下上面函數(shù)的實現(xiàn)

3.1 掃描所有的points, 按時間確定我們需要多個ShardGroupfor?_,?p?:=?range?wp.Points?{

timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]?=?nil

}

3.2 調(diào)用w.MetaClient.CreateShardGroup, 如果ShardGroup存在直接返回ShardGroup信息,如果不存在創(chuàng)建,創(chuàng)建過程涉及到將CreateShardGroup的請求發(fā)送給MetadataServer并等待本地更新到新的MetaData數(shù)據(jù);sg,?err?:=?w.MetaClient.CreateShardGroup(wp.Database,?wp.RetentionPolicy,?t)

3.3 分析ShardGroup的分配規(guī)則, 在services/meta/data.go中的CreateShardGroupfunc?(data?*Data)?CreateShardGroup(database,?policy?string,?timestamp?time.Time)?error?{

...????//?Require?at?least?one?replica?but?no?more?replicas?than?nodes.

//?確認復本數(shù),不能大于DataNode節(jié)點總數(shù)

replicaN?:=?rpi.ReplicaN????if?replicaN?==?0?{

replicaN?=?1

}?else?if?replicaN?>?len(data.DataNodes)?{

replicaN?=?len(data.DataNodes)

}????//?Determine?shard?count?by?node?count?divided?by?replication?factor.

//?This?will?ensure?nodes?will?get?distributed?across?nodes?evenly?and

//?replicated?the?correct?number?of?times.

//?根據(jù)復本數(shù)確定Shard數(shù)量

shardN?:=?len(data.DataNodes)?/?replicaN????//?Create?the?shard?group.

//?創(chuàng)建ShardGroup

data.MaxShardGroupID++

sgi?:=?ShardGroupInfo{}

sgi.ID?=?data.MaxShardGroupID

sgi.StartTime?=?timestamp.Truncate(rpi.ShardGroupDuration).UTC()

sgi.EndTime?=?sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()????//?Create?shards?on?the?group.

sgi.Shards?=?make([]ShardInfo,?shardN)????for?i?:=?range?sgi.Shards?{

data.MaxShardID++

sgi.Shards[i]?=?ShardInfo{ID:?data.MaxShardID}

}????//?Assign?data?nodes?to?shards?via?round?robin.

//?Start?from?a?repeatably?"random"?place?in?the?node?list.

//?ShardInfo中的Owners記錄了當前Shard所有復本所在DataNode的信息

//?分Shard的所有復本分配DataNode

//?使用data.Index作為基數(shù)確定開始的DataNode,然后使用?round?robin策略分配

//?data.Index:每次meta信息有更新,Index就會更新,?可以理解為meta信息的版本號

nodeIndex?:=?int(data.Index?%?uint64(len(data.DataNodes)))????for?i?:=?range?sgi.Shards?{

si?:=?&sgi.Shards[i]????????for?j?:=?0;?j?

nodeID?:=?data.DataNodes[nodeIndex%len(data.DataNodes)].ID

si.Owners?=?append(si.Owners,?ShardOwner{NodeID:?nodeID})

nodeIndex++

}

}????//?Retention?policy?has?a?new?shard?group,?so?update?the?policy.?Shard

//?Groups?must?be?stored?in?sorted?order,?as?other?parts?of?the?system

//?assume?this?to?be?the?case.

rpi.ShardGroups?=?append(rpi.ShardGroups,?sgi)

sort.Sort(ShardGroupInfos(rpi.ShardGroups))????return?nil

}

3.3 按每一個具體的point對應到ShardGroup中的一個Shard: 按point的HashID來對Shard總數(shù)取模,HashID是measurment + tag set的Hash值for?_,?p?:=?range?wp.Points?{

sg?:=?timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]

sh?:=?sg.ShardFor(p.HashID())

mapping.MapPoint(&sh,?p)

}

....

func?(sgi?*ShardGroupInfo)?ShardFor(hash?uint64)?ShardInfo?{????return?sgi.Shards[hash%uint64(len(sgi.Shards))]

}

數(shù)據(jù)按一致性要求寫入過程簡述

1.1 根據(jù)一致性要求確認需要成功寫入幾份switch?consistency?{????//?對于ConsistencyLevelAny,?ConsistencyLevelOne只需要寫入一份即滿足一致性要求,返回客戶端

case?ConsistencyLevelAny,?ConsistencyLevelOne:

required?=?1

case?ConsistencyLevelQuorum:

required?=?required/2?+?1

}

1.2 根據(jù)Shard.Owners對應的DataNode, 向其中的每個DataNode寫入數(shù)據(jù),如果是本機,直接調(diào)用w.TSDBStore.WriteToShard寫入;如果非本機,調(diào)用err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points);

1.3 寫入遠端失敗時,數(shù)據(jù)寫入HintedHandoff本地磁盤隊列多次重試寫到遠端,直到數(shù)據(jù)過期被清理;對于一致性要求是ConsistencyLevelAny, 寫入本地HintedHandoff成功,就算是寫入成功;w.statMap.Add(statWritePointReqHH,?int64(len(points)))

hherr?:=?w.HintedHandoff.WriteShard(shardID,?owner.NodeID,?points)????????????????if?hherr?!=?nil?{

ch?

}????????????????if?hherr?==?nil?&&?consistency?==?ConsistencyLevelAny?{

ch?

}

1.4 等待寫入超時或完成for?range?shard.Owners?{

select?{????????case?

w.statMap.Add(statWriteTimeout,?1)????????????//?return?timeout?error?to?caller

return?ErrTimeout????????case?result?:=?

if?result.Err?!=?nil?{????????????????if?writeError?==?nil?{

writeError?=?result.Err

}????????????????continue

}

wrote++????????????//?寫入已達到一致性要求,就立即返回

if?wrote?>=?required?{

w.statMap.Add(statWriteOK,?1)????????????????return?nil

}

}

}

HintedHandoff服務定義在services/hh/service.go中

寫入HintedHandoff中的數(shù)據(jù),按NodeID的不同寫入不同的目錄,每個目錄下又分多個文件,每個文件作為一個segment, 命名規(guī)則就是依次遞增的id, id的大小按序就是寫入的時間按從舊到新排序;

hitnedhandoff.png

HintedHandoff服務會針對每一個遠端DataNode創(chuàng)建NodeProcessor, 每個負責自己DataNode的寫入, 運行在一個單獨的goroutine中

在每個goroutine中,作兩件事:一個是定時清理過期的數(shù)據(jù),如果被清理掉的數(shù)據(jù)還沒有成功寫入到遠端,則會丟失;二是從文件讀取數(shù)據(jù)寫入到遠端;func?(n?*NodeProcessor)?run()?{

defer?n.wg.Done()

...????for?{

select?{????????case?

case?

n.Logger.Printf("failed?to?purge?for?node?%d:?%s",?n.nodeID,?err.Error())

}????????case?

limiter?:=?NewRateLimiter(n.RetryRateLimit)????????????for?{

c,?err?:=?n.SendWrite()????????????????if?err?!=?nil?{????????????????????if?err?==?io.EOF?{????????????????????????//?No?more?data,?return?to?configured?interval

currInterval?=?time.Duration(n.RetryInterval)

}?else?{

currInterval?=?currInterval?*?2

if?currInterval?>?time.Duration(n.RetryMaxInterval)?{

currInterval?=?time.Duration(n.RetryMaxInterval)

}

}????????????????????break

}????????????????//?Success!?Ensure?backoff?is?cancelled.

currInterval?=?time.Duration(n.RetryInterval)????????????????//?Update?how?many?bytes?we've?sent

limiter.Update(c)????????????????//?Block?to?maintain?the?throughput?rate

time.Sleep(limiter.Delay())

}

}

}

}數(shù)據(jù)的本地存儲和讀取

5.1 定義在services/hh/queue.go,所有的segment file在內(nèi)存中組織成一個隊列,讀從head指向的segment讀取,寫入到tail指向的segment, 每個segment文件的最后8字節(jié)記錄當前segment文件已經(jīng)讀到什么位置

5.2 清理,當這個segment文件內(nèi)容都發(fā)送完當前文件會被刪除,周期性清理每次只會check當前head指向的segment是否需要清理掉

作者:掃帚的影子

鏈接:https://www.jianshu.com/p/6a94486b2daa

總結(jié)

以上是生活随笔為你收集整理的influxdb数据过期_Influxdb Cluster下的数据写入的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。