最近在看源碼,發(fā)現(xiàn)好多地方用到了這個(gè)semaphore
。
本文是在go version go1.13.15 darwin/amd64
上進(jìn)行的
下面是官方的描述
// Semaphore implementation exposed to Go. // Intended use is provide a sleep and wakeup // primitive that can be used in the contended case // of other synchronization primitives. // Thus it targets the same goal as Linux's futex, // but it has much simpler semantics. // // That is, don't think of these as semaphores. // Think of them as a way to implement sleep and wakeup // such that every sleep is paired with a single wakeup, // even if, due to races, the wakeup happens before the sleep. // 具體的用法是提供 sleep 和 wakeup 原語 // 以使其能夠在其它同步原語中的競(jìng)爭(zhēng)情況下使用 // 因此這里的 semaphore 和 Linux 中的 futex 目標(biāo)是一致的 // 只不過語義上更簡(jiǎn)單一些 // // 也就是說,不要認(rèn)為這些是信號(hào)量 // 把這里的東西看作 sleep 和 wakeup 實(shí)現(xiàn)的一種方式 // 每一個(gè) sleep 都會(huì)和一個(gè) wakeup 配對(duì) // 即使在發(fā)生 race 時(shí),wakeup 在 sleep 之前時(shí)也是如此
上面提到了和futex
作用一樣,關(guān)于futex
futex(快速用戶區(qū)互斥的簡(jiǎn)稱)是一個(gè)在Linux上實(shí)現(xiàn)鎖定和構(gòu)建高級(jí)抽象鎖如信號(hào)量和POSIX互斥的基本工具
Futex 由一塊能夠被多個(gè)進(jìn)程共享的內(nèi)存空間(一個(gè)對(duì)齊后的整型變量)組成;這個(gè)整型變量的值能夠通過匯編語言調(diào)用CPU提供的原子操作指令來增加或減少,并且一個(gè)進(jìn)程可以等待直到那個(gè)值變成正數(shù)。Futex 的操作幾乎全部在用戶空間完成;只有當(dāng)操作結(jié)果不一致從而需要仲裁時(shí),才需要進(jìn)入操作系統(tǒng)內(nèi)核空間執(zhí)行。這種機(jī)制允許使用 futex 的鎖定原語有非常高的執(zhí)行效率:由于絕大多數(shù)的操作并不需要在多個(gè)進(jìn)程之間進(jìn)行仲裁,所以絕大多數(shù)操作都可以在應(yīng)用程序空間執(zhí)行,而不需要使用(相對(duì)高代價(jià)的)內(nèi)核系統(tǒng)調(diào)用。
go中的semaphore
作用和futex
目標(biāo)一樣,提供sleep
和wakeup
原語,使其能夠在其它同步原語中的競(jìng)爭(zhēng)情況下使用。當(dāng)一個(gè)goroutine
需要休眠時(shí),將其進(jìn)行集中存放,當(dāng)需要wakeup
時(shí),再將其取出,重新放入調(diào)度器中。
例如在讀寫鎖的實(shí)現(xiàn)中,讀鎖和寫鎖之前的相互阻塞喚醒,就是通過sleep
和wakeup
實(shí)現(xiàn),當(dāng)有讀鎖存在的時(shí)候,新加入的寫鎖通過semaphore
阻塞自己,當(dāng)前面的讀鎖完成,在通過semaphore
喚醒被阻塞的寫鎖。
寫鎖
// 獲取互斥鎖 // 阻塞等待所有讀操作結(jié)束(如果有的話) func (rw *RWMutex) Lock() { ... // 原子的修改readerCount的值,直接將readerCount減去rwmutexMaxReaders // 說明,有寫鎖進(jìn)來了,這在上面的讀鎖中也有體現(xiàn) r := atomic.AddInt32(rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // 當(dāng)r不為0說明,當(dāng)前寫鎖之前有讀鎖的存在 // 修改下readerWait,也就是當(dāng)前寫鎖需要等待的讀鎖的個(gè)數(shù) if r != 0 atomic.AddInt32(rw.readerWait, r) != 0 { // 阻塞當(dāng)前寫鎖 runtime_SemacquireMutex(rw.writerSem, false, 0) } ... }
通過runtime_SemacquireMutex
對(duì)當(dāng)前寫鎖進(jìn)行sleep
讀鎖釋放
// 減少讀操作計(jì)數(shù),即readerCount-- // 喚醒等待寫操作的協(xié)程(如果有的話) func (rw *RWMutex) RUnlock() { ... // 首先通過atomic的原子性使readerCount-1 // 1.若readerCount大于0, 證明當(dāng)前還有讀鎖, 直接結(jié)束本次操作 // 2.若readerCount小于0, 證明已經(jīng)沒有讀鎖, 但是還有因?yàn)樽x鎖被阻塞的寫鎖存在 if r := atomic.AddInt32(rw.readerCount, -1); r 0 { // 嘗試喚醒被阻塞的寫鎖 rw.rUnlockSlow(r) } ... } func (rw *RWMutex) rUnlockSlow(r int32) { ... // readerWait--操作,如果readerWait--操作之后的值為0,說明,寫鎖之前,已經(jīng)沒有讀鎖了 // 通過writerSem信號(hào)量,喚醒隊(duì)列中第一個(gè)阻塞的寫鎖 if atomic.AddInt32(rw.readerWait, -1) == 0 { // 喚醒一個(gè)寫鎖 runtime_Semrelease(rw.writerSem, false, 1) } }
寫鎖處理完之后,調(diào)用runtime_Semrelease
來喚醒sleep
的寫鎖
在go/src/sync/runtime.go
中,定義了這幾個(gè)方法
// Semacquire等待*s > 0,然后原子遞減它。 // 它是一個(gè)簡(jiǎn)單的睡眠原語,用于同步 // library and不應(yīng)該直接使用。 func runtime_Semacquire(s *uint32) // SemacquireMutex類似于Semacquire,用來阻塞互斥的對(duì)象 // 如果lifo為true,waiter將會(huì)被插入到隊(duì)列的頭部 // skipframes是跟蹤過程中要省略的幀數(shù),從這里開始計(jì)算 // runtime_SemacquireMutex's caller. func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int) // Semrelease會(huì)自動(dòng)增加*s并通知一個(gè)被Semacquire阻塞的等待的goroutine // 它是一個(gè)簡(jiǎn)單的喚醒原語,用于同步 // library and不應(yīng)該直接使用。 // 如果handoff為true, 傳遞信號(hào)到隊(duì)列頭部的waiter // skipframes是跟蹤過程中要省略的幀數(shù),從這里開始計(jì)算 // runtime_Semrelease's caller. func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
具體的實(shí)現(xiàn)是在go/src/runtime/sema.go
中
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire func sync_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile, 0) } //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { semrelease1(addr, handoff, skipframes) } //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes) }
semaphore
的實(shí)現(xiàn)使用到了sudog
,我們先來看下
sudog 是運(yùn)行時(shí)用來存放處于阻塞狀態(tài)的goroutine
的一個(gè)上層抽象,是用來實(shí)現(xiàn)用戶態(tài)信號(hào)量的主要機(jī)制之一。 例如當(dāng)一個(gè)goroutine
因?yàn)榈却?code>channel的數(shù)據(jù)需要進(jìn)行阻塞時(shí),sudog
會(huì)將goroutine
及其用于等待數(shù)據(jù)的位置進(jìn)行記錄, 并進(jìn)而串聯(lián)成一個(gè)等待隊(duì)列,或二叉平衡樹。
// sudogs are allocated from a special pool. Use acquireSudog and // releaseSudog to allocate and free them. type sudog struct { // 以下字段受hchan保護(hù) g *g // isSelect 表示 g 正在參與一個(gè) select, so // 因此 g.selectDone 必須以 CAS 的方式來獲取wake-up race. isSelect bool next *sudog prev *sudog elem unsafe.Pointer // 數(shù)據(jù)元素(可能指向棧) // 以下字段不會(huì)并發(fā)訪問。 // 對(duì)于通道,waitlink只被g訪問。 // 對(duì)于信號(hào)量,所有字段(包括上面的字段) // 只有當(dāng)持有一個(gè)semroot鎖時(shí)才被訪問。 acquiretime int64 releasetime int64 ticket uint32 parent *sudog //semaRoot 二叉樹 waitlink *sudog // g.waiting 列表或 semaRoot waittail *sudog // semaRoot c *hchan // channel }
sudog
的獲取和歸還,遵循以下策略:
1、獲取,首先從per-P
緩存獲取,對(duì)于per-P
緩存,如果per-P
緩存為空,則從全局池抓取一半,然后取出per-P
緩存中的最后一個(gè);
2、歸還,歸還到per-P
緩存,如果per-P
緩存滿了,就把per-P
緩存的一半歸還到全局緩存中,然后歸還sudog
到per-P
緩存中。
1、如果per-P
緩存的內(nèi)容沒達(dá)到長(zhǎng)度的一般,則會(huì)從全局額緩存中抓取一半;
2、然后返回把per-P
緩存中最后一個(gè)sudog
返回,并且置空;
// go/src/runtime/proc.go //go:nosplit func acquireSudog() *sudog { // Delicate dance: 信號(hào)量的實(shí)現(xiàn)調(diào)用acquireSudog,然后acquireSudog調(diào)用new(sudog) // new調(diào)用malloc, malloc調(diào)用垃圾收集器,垃圾收集器在stopTheWorld調(diào)用信號(hào)量 // 通過在new(sudog)周圍執(zhí)行acquirem/releasem來打破循環(huán) // acquirem/releasem在new(sudog)期間增加m.locks,防止垃圾收集器被調(diào)用。 // 獲取當(dāng)前 g 所在的 m mp := acquirem() // 獲取p的指針 pp := mp.p.ptr() if len(pp.sudogcache) == 0 { lock(sched.sudoglock) // 首先,嘗試從中央緩存獲取一批數(shù)據(jù)。 for len(pp.sudogcache) cap(pp.sudogcache)/2 sched.sudogcache != nil { s := sched.sudogcache sched.sudogcache = s.next s.next = nil pp.sudogcache = append(pp.sudogcache, s) } unlock(sched.sudoglock) // 如果中央緩存中沒有,新分配 if len(pp.sudogcache) == 0 { pp.sudogcache = append(pp.sudogcache, new(sudog)) } } // 取緩存中最后一個(gè) n := len(pp.sudogcache) s := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil // 將剛?cè)〕龅脑诰彺嬷幸瞥? pp.sudogcache = pp.sudogcache[:n-1] if s.elem != nil { throw("acquireSudog: found s.elem != nil in cache") } releasem(mp) return s }
1、如果per-P
緩存滿了,就歸還per-P
緩存一般的內(nèi)容到全局緩存;
2、然后將回收的sudog
放到per-P
緩存中。
// go/src/runtime/proc.go //go:nosplit func releaseSudog(s *sudog) { if s.elem != nil { throw("runtime: sudog with non-nil elem") } if s.isSelect { throw("runtime: sudog with non-false isSelect") } if s.next != nil { throw("runtime: sudog with non-nil next") } if s.prev != nil { throw("runtime: sudog with non-nil prev") } if s.waitlink != nil { throw("runtime: sudog with non-nil waitlink") } if s.c != nil { throw("runtime: sudog with non-nil c") } gp := getg() if gp.param != nil { throw("runtime: releaseSudog with non-nil gp.param") } // 避免重新安排到另一個(gè)P mp := acquirem() // avoid rescheduling to another P pp := mp.p.ptr() // 如果緩存滿了 if len(pp.sudogcache) == cap(pp.sudogcache) { // 將本地高速緩存的一半傳輸?shù)街醒敫咚倬彺? var first, last *sudog for len(pp.sudogcache) > cap(pp.sudogcache)/2 { n := len(pp.sudogcache) p := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if first == nil { first = p } else { last.next = p } last = p } lock(sched.sudoglock) last.next = sched.sudogcache sched.sudogcache = first unlock(sched.sudoglock) } // 歸還sudog到`per-P`緩存中 pp.sudogcache = append(pp.sudogcache, s) releasem(mp) }
// go/src/runtime/sema.go // 用于sync.Mutex的異步信號(hào)量。 // semaRoot擁有一個(gè)具有不同地址(s.elem)的sudog平衡樹。 // 每個(gè)sudog都可以依次(通過s.waitlink)指向一個(gè)列表,在相同地址上等待的其他sudog。 // 對(duì)具有相同地址的sudog內(nèi)部列表進(jìn)行的操作全部為O(1)。頂層semaRoot列表的掃描為O(log n), // 其中,n是阻止goroutines的不同地址的數(shù)量,通過他們散列到給定的semaRoot。 type semaRoot struct { lock mutex // waiters的平衡樹的根節(jié)點(diǎn) treap *sudog // waiters的數(shù)量,讀取的時(shí)候無所 nwait uint32 } // Prime to not correlate with any user patterns. const semTabSize = 251 var semtable [semTabSize]struct { root semaRoot pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte }
// go/src/runtime/sema.go //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire func poll_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile, 0) } //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes) } func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) { // 判斷這個(gè)goroutine,是否是m上正在運(yùn)行的那個(gè) gp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") } // *addr -= 1 if cansemacquire(addr) { return } // 增加等待計(jì)數(shù) // 再試一次 cansemacquire 如果成功則直接返回 // 將自己作為等待者入隊(duì) // 休眠 // (等待器描述符由出隊(duì)信號(hào)產(chǎn)生出隊(duì)行為) // 獲取一個(gè)sudog s := acquireSudog() root := semroot(addr) t0 := int64(0) s.releasetime = 0 s.acquiretime = 0 s.ticket = 0 if profilesemaBlockProfile != 0 blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } if profilesemaMutexProfile != 0 mutexprofilerate > 0 { if t0 == 0 { t0 = cputicks() } s.acquiretime = t0 } for { lock(root.lock) // 添加我們自己到nwait來禁用semrelease中的"easy case" atomic.Xadd(root.nwait, 1) // 檢查cansemacquire避免錯(cuò)過喚醒 if cansemacquire(addr) { atomic.Xadd(root.nwait, -1) unlock(root.lock) break } // 任何在 cansemacquire 之后的 semrelease 都知道我們?cè)诘却ㄒ驗(yàn)樵O(shè)置了 nwait),因此休眠 // 隊(duì)列將s添加到semaRoot中被阻止的goroutine中 root.queue(addr, s, lifo) // 將當(dāng)前goroutine置于等待狀態(tài)并解鎖鎖。 // 通過調(diào)用goready(gp),可以使goroutine再次可運(yùn)行。 goparkunlock(root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } } if s.releasetime > 0 { blockevent(s.releasetime-t0, 3+skipframes) } // 歸還sudog releaseSudog(s) } func cansemacquire(addr *uint32) bool { for { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } } }
// go/src/runtime/sema.go //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { semrelease1(addr, handoff, skipframes) } func semrelease1(addr *uint32, handoff bool, skipframes int) { root := semroot(addr) atomic.Xadd(addr, 1) // Easy case:沒有等待者 // 這個(gè)檢查必須發(fā)生在xadd之后,以避免錯(cuò)過喚醒 if atomic.Load(root.nwait) == 0 { return } // Harder case: 找到等待者,并且喚醒 lock(root.lock) if atomic.Load(root.nwait) == 0 { // 該計(jì)數(shù)已被另一個(gè)goroutine占用, // 因此無需喚醒其他goroutine。 unlock(root.lock) return } // 搜索一個(gè)等待著然后將其喚醒 s, t0 := root.dequeue(addr) if s != nil { atomic.Xadd(root.nwait, -1) } unlock(root.lock) if s != nil { // 可能會(huì)很慢,因此先解鎖 acquiretime := s.acquiretime if acquiretime != 0 { mutexevent(t0-acquiretime, 3+skipframes) } if s.ticket != 0 { throw("corrupted semaphore ticket") } if handoff cansemacquire(addr) { s.ticket = 1 } // goready(s.g, 5) // 標(biāo)記 runnable,等待被重新調(diào)度 readyWithTime(s, 5+skipframes) } }
摘自"同步原語"的一段總結(jié)
這一對(duì) semacquire 和 semrelease 理解上可能不太直觀。 首先,我們必須意識(shí)到這兩個(gè)函數(shù)一定是在兩個(gè)不同的 M(線程)上得到執(zhí)行,否則不會(huì)出現(xiàn)并發(fā),我們不妨設(shè)為 M1 和 M2。 當(dāng) M1 上的 G1 執(zhí)行到 semacquire1 時(shí),如果快速路徑成功,則說明 G1 搶到鎖,能夠繼續(xù)執(zhí)行。但一旦失敗且在慢速路徑下 依然搶不到鎖,則會(huì)進(jìn)入 goparkunlock,將當(dāng)前的 G1 放到等待隊(duì)列中,進(jìn)而讓 M1 切換并執(zhí)行其他 G。 當(dāng) M2 上的 G2 開始調(diào)用 semrelease1 時(shí),只是單純的將等待隊(duì)列的 G1 重新放到調(diào)度隊(duì)列中,而當(dāng) G1 重新被調(diào)度時(shí)(假設(shè)運(yùn)氣好又在 M1 上被調(diào)度),代碼仍然會(huì)從 goparkunlock 之后開始執(zhí)行,并再次嘗試競(jìng)爭(zhēng)信號(hào)量,如果成功,則會(huì)歸還 sudog。
參考
【同步原語】https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/sync/
【Go并發(fā)編程實(shí)戰(zhàn)--信號(hào)量的使用方法和其實(shí)現(xiàn)原理】https://juejin.cn/post/6906677772479889422
【Semaphore】https://github.com/cch123/golang-notes/blob/master/semaphore.md
【進(jìn)程同步之信號(hào)量機(jī)制(pv操作)及三個(gè)經(jīng)典同步問題】https://blog.csdn.net/SpeedMe/article/details/17597373
到此這篇關(guān)于go中semaphore(信號(hào)量)源碼解讀的文章就介紹到這了,更多相關(guān)go中semaphore源碼內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
標(biāo)簽:儋州 海南 電子產(chǎn)品 安康 物業(yè)服務(wù) 西雙版納 青海 遼寧
巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《一文讀懂go中semaphore(信號(hào)量)源碼》,本文關(guān)鍵詞 一文,讀懂,中,semaphore,信號(hào),;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。