无锁数据结构三:无锁数据结构的两大问题
實現無鎖數據結構最困難的兩個問題是ABA問題和內存回收問題。它們之間存在著一定的關聯:一般內存回收問題的解決方案,可以作為解決ABA問題的一種只需很少開銷或者根本不需額外開銷的方法,但也存在一些情況并不可行,如兩個鏈表實現的棧,不斷在兩個棧間交換節點。下面對兩個問題的主流解決方法進行介紹。
標簽指針(Tagged pointers)
標簽指針作為一種規范由IBM引入,旨在解決ABA問題。從某一方面來看,ABA問題的出現是由于不能區分前一個A與后一個A,標簽指針通過標簽(版本號)來解決。其每個指針由一組原子性的內存單元和標簽(32比特的整數)組成。
template <typename T> struct tagged_ptr {T * ptr ;unsigned int tag ;tagged_ptr(): ptr(nullptr), tag(0) {}tagged_ptr( T * p ): ptr(p), tag(0) {}tagged_ptr( T * p, unsigned int n ): ptr(p), tag(n){}T * operator->() const { return ptr; } };- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
標簽作為一個版本號,隨著標簽指針上的每次CAS運算增加,并且只增不減。一旦需要從容器中非物理地移除某個元素,就應將其放入一個放置空閑元素的列表中。在空閑元素列表中,邏輯刪除的元素完全有可能被再次調用。因為是無鎖數據結構,一個線程刪除X元素,另外一個線程依然可以持有標簽指針的本地副本,并指向元素字段。因此需要一個針對每種T類型的空閑元素列表。多數情況下,將元素放入空閑列表中,意味著調用這個T類型數據的析構函數是非法的(考慮到并行訪問,在析構函數運算的過程中,其它線程是可以讀到此元素的數據)。
有了標簽,A在CAS時雖然內存地址相同,但若A已被使用過,其標簽不同。不會出現ABA問題。
當然其必然也有缺點:
其一:要實現標簽指針需要平臺支持,平臺需要支持dwCAS(地址指針需一個word,標簽至少需32位),由于現代的系統架構都有一套完整的64位指令集,對于32位系統,dwCAS需要64位,可以支持;但對于64位系統,dwCAS需要128位(至少96位),并不是所有架構都能支持。
The scheme is implemented at platforms, which have an atomic CAS primitive over a double word (dwCAS). This requirement is fulfilled for 32-bit modern systems as dwCAS operates with 64-bit words while all modern architectures have a complete set of 64-bit instructions. But 128-bit (or at least 96-bit) dwCAS is required for 64-bit operation mode. It isn’t implemented in all architectures.
其二:空閑列表通常以無鎖棧或無鎖隊列的方式實現,對性能也會有影響,但也正是因為使用無鎖,導致其性能有提高(相對于有鎖)。?
其三: 對于每種類型提供單獨的空閑列表,這樣做太過奢侈難以被大眾所接收,一些應用使用內存太過低效。例如,無鎖隊列通常包含10個元素,但可以擴展到百萬,比如在一次阻塞后,空閑列表擴展至百萬。
Availability of a separate free-list for every data type can be an unattainable luxury for some applications as it can lead to inefficient memory use. For example, if a lock-free queue consists of 10 elements on the average but its size can increase up to 1 million, the free-list size after the spike can be about 1 million. Such behavior is often illegal.
Example
詳情可參考1
template <typename T> struct node {tagged_ptr next;T data; } ; template <typename T> class MSQueue {tagged_ptr<T> volatile m_Head;tagged_ptr<T> volatile m_Tail;FreeList m_FreeList; public:MSQueue(){// Allocate dummy node// Head & Tail point to dummy nodem_Head.ptr = m_Tail.ptr = new node();} void enqueue( T const& value ) { E1: node * pNode = m_FreeList.newNode(); E2: pNode–>data = value; E3: pNode–>next.ptr = nullptr; E4: for (;;) { E5: tagged_ptr<T> tail = m_Tail; E6: tagged_ptr<T> next = tail.ptr–>next; E7: if tail == Q–>Tail {// Does Tail point to the last element? E8: if next.ptr == nullptr {// Trying to add the element in the end of the list E9: if CAS(&tail.ptr–>next, next, tagged_ptr<T>(node, next.tag+1)) {// Success, leave the loop E10: break;} E11: } else {// Tail doesn’t point to the last element// Trying to relocate tail to the last element E12: CAS(&m_Tail, tail, tagged_ptr<T>(next.ptr, tail.tag+1));}}} // end loop// Trying to relocate tail to the inserted element E13: CAS(&m_Tail, tail, tagged_ptr<T>(pNode, tail.tag+1));} bool dequeue( T& dest ) { D1: for (;;) { D2: tagged_ptr<T> head = m_Head; D3: tagged_ptr<T> tail = m_Tail; D4: tagged_ptr<T> next = head–>next;// Head, tail and next consistent? D5: if ( head == m_Head ) {// Is queue empty or isn’t tail the last? D6: if ( head.ptr == tail.ptr ) {// Is the queue empty? D7: if (next.ptr == nullptr ) {// The queue is empty D8: return false;}// Tail isn’t at the last element// Trying to move tail forward D9: CAS(&m_Tail, tail, tagged_ptr<T>(next.ptr, tail.tag+1>)); D10: } else { // Tail is in position// Read the value before CAS, as otherwise // another dequeue can deallocate next D11: dest = next.ptr–>data;// Trying to move head forward D12: if (CAS(&m_Head, head, tagged_ptr<T>(next.ptr, head.tag+1)) D13: break // Success, leave the loop}}} // end of loop// Deallocate the old dummy node D14: m_FreeList.add(head.ptr); D15: return true; // the result is in dest}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
險象指針(Hazard pointer)
此規則由Michael創建(Hazard Pointers: Safe Memory Reclamation for Lock-Free Objects),用來保護無鎖數據結構中元素的局部引用,也即延遲刪除。
it’s the most popular and studied scheme of delayed deletion.
此規則的實現僅依賴原子性讀寫,而未采用任何重量級的CAS同步原語。其實現原理大致如下:
Note:Plist 一寫多讀;Dlist單寫單讀。
hazard pointer的使用是要結合具體的數據結構的,我們需要分析所要保護的數據結構的每一步操作,找出需要保護的內存對象并使用hazard pointer替換普通指針對危險的內存訪問進行保護。
Example
詳情可參考3
template <typename T> void Queue<T>::enqueue(const T &data) {qnode *node = new qnode();node->data = data;node->next = NULL;// qnode *t = NULL;HazardPointer<qnode> t(hazard_mgr_);qnode *next = NULL;while (true) {if (!t.acquire(&tail_)) {continue;}next = t->next;if (next) {__sync_bool_compare_and_swap(&tail_, t, next);continue;}if (__sync_bool_compare_and_swap(&t->next, NULL, node)) {break;}}__sync_bool_compare_and_swap(&tail_, t, node); }template <typename T> bool Queue<T>::dequeue(T &data) {qnode *t = NULL;// qnode *h = NULL;HazardPointer<qnode> h(hazard_mgr_);// qnode *next = NULL;HazardPointer<qnode> next(hazard_mgr_);while (true) {if (!h.acquire(&head_)) {continue;}t = tail_;next.acquire(&h->next);asm volatile("" ::: "memory");if (head_ != h) {continue;}if (!next) {return false;}if (h == t) {__sync_bool_compare_and_swap(&tail_, t, next);continue;}data = next->data;if (__sync_bool_compare_and_swap(&head_, h, next)) {break;}}/* h->next = (qnode *)1; // bad address, It's a trap! *//* delete h; */hazard_mgr_.retireNode(h);return true; }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
參考資料
總結
以上是生活随笔為你收集整理的无锁数据结构三:无锁数据结构的两大问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: selenium操作chrome时的一些
- 下一篇: JAVA并发编程: CAS和AQS