日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > linux >内容正文

linux

Linux 信号量 源码,一文读懂go中semaphore(信号量)源码

發布時間:2024/4/11 linux 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Linux 信号量 源码,一文读懂go中semaphore(信号量)源码 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

運行時信號量機制 semaphore

前言

最近在看源碼,發現好多地方用到了這個semaphore。

本文是在go version go1.13.15 darwin/amd64上進行的

作用是什么

下面是官方的描述

// Semaphore implementation exposed to Go.

// Intended use is provide a sleep and wakeup

// primitive that can be used in the contended case

// of other synchronization primitives.

// Thus it targets the same goal as Linux's futex,

// but it has much simpler semantics.

//

// That is, don't think of these as semaphores.

// Think of them as a way to implement sleep and wakeup

// such that every sleep is paired with a single wakeup,

// even if, due to races, the wakeup happens before the sleep.

// 具體的用法是提供 sleep 和 wakeup 原語

// 以使其能夠在其它同步原語中的競爭情況下使用

// 因此這里的 semaphore 和 Linux 中的 futex 目標是一致的

// 只不過語義上更簡單一些

//

// 也就是說,不要認為這些是信號量

// 把這里的東西看作 sleep 和 wakeup 實現的一種方式

// 每一個 sleep 都會和一個 wakeup 配對

// 即使在發生 race 時,wakeup 在 sleep 之前時也是如此

上面提到了和futex作用一樣,關于futex

futex(快速用戶區互斥的簡稱)是一個在Linux上實現鎖定和構建高級抽象鎖如信號量和POSIX互斥的基本工具

Futex 由一塊能夠被多個進程共享的內存空間(一個對齊后的整型變量)組成;這個整型變量的值能夠通過匯編語言調用CPU提供的原子操作指令來增加或減少,并且一個進程可以等待直到那個值變成正數。Futex 的操作幾乎全部在用戶空間完成;只有當操作結果不一致從而需要仲裁時,才需要進入操作系統內核空間執行。這種機制允許使用 futex 的鎖定原語有非常高的執行效率:由于絕大多數的操作并不需要在多個進程之間進行仲裁,所以絕大多數操作都可以在應用程序空間執行,而不需要使用(相對高代價的)內核系統調用。

go中的semaphore作用和futex目標一樣,提供sleep和wakeup原語,使其能夠在其它同步原語中的競爭情況下使用。當一個goroutine需要休眠時,將其進行集中存放,當需要wakeup時,再將其取出,重新放入調度器中。

例如在讀寫鎖的實現中,讀鎖和寫鎖之前的相互阻塞喚醒,就是通過sleep和wakeup實現,當有讀鎖存在的時候,新加入的寫鎖通過semaphore阻塞自己,當前面的讀鎖完成,在通過semaphore喚醒被阻塞的寫鎖。

寫鎖

// 獲取互斥鎖

// 阻塞等待所有讀操作結束(如果有的話)

func (rw *RWMutex) Lock() {

...

// 原子的修改readerCount的值,直接將readerCount減去rwmutexMaxReaders

// 說明,有寫鎖進來了,這在上面的讀鎖中也有體現

r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders

// 當r不為0說明,當前寫鎖之前有讀鎖的存在

// 修改下readerWait,也就是當前寫鎖需要等待的讀鎖的個數

if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {

// 阻塞當前寫鎖

runtime_SemacquireMutex(&rw.writerSem, false, 0)

}

...

}

通過runtime_SemacquireMutex對當前寫鎖進行sleep

讀鎖釋放

// 減少讀操作計數,即readerCount--

// 喚醒等待寫操作的協程(如果有的話)

func (rw *RWMutex) RUnlock() {

...

// 首先通過atomic的原子性使readerCount-1

// 1.若readerCount大于0, 證明當前還有讀鎖, 直接結束本次操作

// 2.若readerCount小于0, 證明已經沒有讀鎖, 但是還有因為讀鎖被阻塞的寫鎖存在

if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {

// 嘗試喚醒被阻塞的寫鎖

rw.rUnlockSlow(r)

}

...

}

func (rw *RWMutex) rUnlockSlow(r int32) {

...

// readerWait--操作,如果readerWait--操作之后的值為0,說明,寫鎖之前,已經沒有讀鎖了

// 通過writerSem信號量,喚醒隊列中第一個阻塞的寫鎖

if atomic.AddInt32(&rw.readerWait, -1) == 0 {

// 喚醒一個寫鎖

runtime_Semrelease(&rw.writerSem, false, 1)

}

}

寫鎖處理完之后,調用runtime_Semrelease來喚醒sleep的寫鎖

幾個主要的方法

在go/src/sync/runtime.go中,定義了這幾個方法

// Semacquire等待*s > 0,然后原子遞減它。

// 它是一個簡單的睡眠原語,用于同步

// library and不應該直接使用。

func runtime_Semacquire(s *uint32)

// SemacquireMutex類似于Semacquire,用來阻塞互斥的對象

// 如果lifo為true,waiter將會被插入到隊列的頭部

// skipframes是跟蹤過程中要省略的幀數,從這里開始計算

// runtime_SemacquireMutex's caller.

func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)

// Semrelease會自動增加*s并通知一個被Semacquire阻塞的等待的goroutine

// 它是一個簡單的喚醒原語,用于同步

// library and不應該直接使用。

// 如果handoff為true, 傳遞信號到隊列頭部的waiter

// skipframes是跟蹤過程中要省略的幀數,從這里開始計算

// runtime_Semrelease's caller.

func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

具體的實現是在go/src/runtime/sema.go中

//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire

func sync_runtime_Semacquire(addr *uint32) {

semacquire1(addr, false, semaBlockProfile, 0)

}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease

func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {

semrelease1(addr, handoff, skipframes)

}

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex

func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {

semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)

}

如何實現

sudog 緩存

semaphore的實現使用到了sudog,我們先來看下

sudog 是運行時用來存放處于阻塞狀態的goroutine的一個上層抽象,是用來實現用戶態信號量的主要機制之一。 例如當一個goroutine因為等待channel的數據需要進行阻塞時,sudog會將goroutine及其用于等待數據的位置進行記錄, 并進而串聯成一個等待隊列,或二叉平衡樹。

// sudogs are allocated from a special pool. Use acquireSudog and

// releaseSudog to allocate and free them.

type sudog struct {

// 以下字段受hchan保護

g *g

// isSelect 表示 g 正在參與一個 select, so

// 因此 g.selectDone 必須以 CAS 的方式來獲取wake-up race.

isSelect bool

next *sudog

prev *sudog

elem unsafe.Pointer // 數據元素(可能指向棧)

// 以下字段不會并發訪問。

// 對于通道,waitlink只被g訪問。

// 對于信號量,所有字段(包括上面的字段)

// 只有當持有一個semroot鎖時才被訪問。

acquiretime int64

releasetime int64

ticket uint32

parent *sudog //semaRoot 二叉樹

waitlink *sudog // g.waiting 列表或 semaRoot

waittail *sudog // semaRoot

c *hchan // channel

}

sudog的獲取和歸還,遵循以下策略:

1、獲取,首先從per-P緩存獲取,對于per-P緩存,如果per-P緩存為空,則從全局池抓取一半,然后取出per-P緩存中的最后一個;

2、歸還,歸還到per-P緩存,如果per-P緩存滿了,就把per-P緩存的一半歸還到全局緩存中,然后歸還sudog到per-P緩存中。

acquireSudog

1、如果per-P緩存的內容沒達到長度的一般,則會從全局額緩存中抓取一半;

2、然后返回把per-P緩存中最后一個sudog返回,并且置空;

// go/src/runtime/proc.go

//go:nosplit

func acquireSudog() *sudog {

// Delicate dance: 信號量的實現調用acquireSudog,然后acquireSudog調用new(sudog)

// new調用malloc, malloc調用垃圾收集器,垃圾收集器在stopTheWorld調用信號量

// 通過在new(sudog)周圍執行acquirem/releasem來打破循環

// acquirem/releasem在new(sudog)期間增加m.locks,防止垃圾收集器被調用。

// 獲取當前 g 所在的 m

mp := acquirem()

// 獲取p的指針

pp := mp.p.ptr()

if len(pp.sudogcache) == 0 {

lock(&sched.sudoglock)

// 首先,嘗試從中央緩存獲取一批數據。

for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {

s := sched.sudogcache

sched.sudogcache = s.next

s.next = nil

pp.sudogcache = append(pp.sudogcache, s)

}

unlock(&sched.sudoglock)

// 如果中央緩存中沒有,新分配

if len(pp.sudogcache) == 0 {

pp.sudogcache = append(pp.sudogcache, new(sudog))

}

}

// 取緩存中最后一個

n := len(pp.sudogcache)

s := pp.sudogcache[n-1]

pp.sudogcache[n-1] = nil

// 將剛取出的在緩存中移除

pp.sudogcache = pp.sudogcache[:n-1]

if s.elem != nil {

throw("acquireSudog: found s.elem != nil in cache")

}

releasem(mp)

return s

}

releaseSudog

1、如果per-P緩存滿了,就歸還per-P緩存一般的內容到全局緩存;

2、然后將回收的sudog放到per-P緩存中。

// go/src/runtime/proc.go

//go:nosplit

func releaseSudog(s *sudog) {

if s.elem != nil {

throw("runtime: sudog with non-nil elem")

}

if s.isSelect {

throw("runtime: sudog with non-false isSelect")

}

if s.next != nil {

throw("runtime: sudog with non-nil next")

}

if s.prev != nil {

throw("runtime: sudog with non-nil prev")

}

if s.waitlink != nil {

throw("runtime: sudog with non-nil waitlink")

}

if s.c != nil {

throw("runtime: sudog with non-nil c")

}

gp := getg()

if gp.param != nil {

throw("runtime: releaseSudog with non-nil gp.param")

}

// 避免重新安排到另一個P

mp := acquirem() // avoid rescheduling to another P

pp := mp.p.ptr()

// 如果緩存滿了

if len(pp.sudogcache) == cap(pp.sudogcache) {

// 將本地高速緩存的一半傳輸到中央高速緩存

var first, last *sudog

for len(pp.sudogcache) > cap(pp.sudogcache)/2 {

n := len(pp.sudogcache)

p := pp.sudogcache[n-1]

pp.sudogcache[n-1] = nil

pp.sudogcache = pp.sudogcache[:n-1]

if first == nil {

first = p

} else {

last.next = p

}

last = p

}

lock(&sched.sudoglock)

last.next = sched.sudogcache

sched.sudogcache = first

unlock(&sched.sudoglock)

}

// 歸還sudog到`per-P`緩存中

pp.sudogcache = append(pp.sudogcache, s)

releasem(mp)

}

semaphore

// go/src/runtime/sema.go

// 用于sync.Mutex的異步信號量。

// semaRoot擁有一個具有不同地址(s.elem)的sudog平衡樹。

// 每個sudog都可以依次(通過s.waitlink)指向一個列表,在相同地址上等待的其他sudog。

// 對具有相同地址的sudog內部列表進行的操作全部為O(1)。頂層semaRoot列表的掃描為O(log n),

// 其中,n是阻止goroutines的不同地址的數量,通過他們散列到給定的semaRoot。

type semaRoot struct {

lock mutex

// waiters的平衡樹的根節點

treap *sudog

// waiters的數量,讀取的時候無所

nwait uint32

}

// Prime to not correlate with any user patterns.

const semTabSize = 251

var semtable [semTabSize]struct {

root semaRoot

pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte

}

poll_runtime_Semacquire/sync_runtime_SemacquireMutex

// go/src/runtime/sema.go

//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire

func poll_runtime_Semacquire(addr *uint32) {

semacquire1(addr, false, semaBlockProfile, 0)

}

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex

func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {

semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)

}

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {

// 判斷這個goroutine,是否是m上正在運行的那個

gp := getg()

if gp != gp.m.curg {

throw("semacquire not on the G stack")

}

// *addr -= 1

if cansemacquire(addr) {

return

}

// 增加等待計數

// 再試一次 cansemacquire 如果成功則直接返回

// 將自己作為等待者入隊

// 休眠

// (等待器描述符由出隊信號產生出隊行為)

// 獲取一個sudog

s := acquireSudog()

root := semroot(addr)

t0 := int64(0)

s.releasetime = 0

s.acquiretime = 0

s.ticket = 0

if profile&semaBlockProfile != 0 && blockprofilerate > 0 {

t0 = cputicks()

s.releasetime = -1

}

if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {

if t0 == 0 {

t0 = cputicks()

}

s.acquiretime = t0

}

for {

lock(&root.lock)

// 添加我們自己到nwait來禁用semrelease中的"easy case"

atomic.Xadd(&root.nwait, 1)

// 檢查cansemacquire避免錯過喚醒

if cansemacquire(addr) {

atomic.Xadd(&root.nwait, -1)

unlock(&root.lock)

break

}

// 任何在 cansemacquire 之后的 semrelease 都知道我們在等待(因為設置了 nwait),因此休眠

// 隊列將s添加到semaRoot中被阻止的goroutine中

root.queue(addr, s, lifo)

// 將當前goroutine置于等待狀態并解鎖鎖。

// 通過調用goready(gp),可以使goroutine再次可運行。

goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)

if s.ticket != 0 || cansemacquire(addr) {

break

}

}

if s.releasetime > 0 {

blockevent(s.releasetime-t0, 3+skipframes)

}

// 歸還sudog

releaseSudog(s)

}

func cansemacquire(addr *uint32) bool {

for {

v := atomic.Load(addr)

if v == 0 {

return false

}

if atomic.Cas(addr, v, v-1) {

return true

}

}

}

sync_runtime_Semrelease

// go/src/runtime/sema.go

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease

func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {

semrelease1(addr, handoff, skipframes)

}

func semrelease1(addr *uint32, handoff bool, skipframes int) {

root := semroot(addr)

atomic.Xadd(addr, 1)

// Easy case:沒有等待者

// 這個檢查必須發生在xadd之后,以避免錯過喚醒

if atomic.Load(&root.nwait) == 0 {

return

}

// Harder case: 找到等待者,并且喚醒

lock(&root.lock)

if atomic.Load(&root.nwait) == 0 {

// 該計數已被另一個goroutine占用,

// 因此無需喚醒其他goroutine。

unlock(&root.lock)

return

}

// 搜索一個等待著然后將其喚醒

s, t0 := root.dequeue(addr)

if s != nil {

atomic.Xadd(&root.nwait, -1)

}

unlock(&root.lock)

if s != nil { // 可能會很慢,因此先解鎖

acquiretime := s.acquiretime

if acquiretime != 0 {

mutexevent(t0-acquiretime, 3+skipframes)

}

if s.ticket != 0 {

throw("corrupted semaphore ticket")

}

if handoff && cansemacquire(addr) {

s.ticket = 1

}

// goready(s.g, 5)

// 標記 runnable,等待被重新調度

readyWithTime(s, 5+skipframes)

}

}

摘自"同步原語"的一段總結

這一對 semacquire 和 semrelease 理解上可能不太直觀。 首先,我們必須意識到這兩個函數一定是在兩個不同的 M(線程)上得到執行,否則不會出現并發,我們不妨設為 M1 和 M2。 當 M1 上的 G1 執行到 semacquire1 時,如果快速路徑成功,則說明 G1 搶到鎖,能夠繼續執行。但一旦失敗且在慢速路徑下 依然搶不到鎖,則會進入 goparkunlock,將當前的 G1 放到等待隊列中,進而讓 M1 切換并執行其他 G。 當 M2 上的 G2 開始調用 semrelease1 時,只是單純的將等待隊列的 G1 重新放到調度隊列中,而當 G1 重新被調度時(假設運氣好又在 M1 上被調度),代碼仍然會從 goparkunlock 之后開始執行,并再次嘗試競爭信號量,如果成功,則會歸還 sudog。

參考

到此這篇關于go中semaphore(信號量)源碼解讀的文章就介紹到這了,更多相關go中semaphore源碼內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持腳本之家!

超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

總結

以上是生活随笔為你收集整理的Linux 信号量 源码,一文读懂go中semaphore(信号量)源码的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。