日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

自己动手实现自旋锁(spinlock)

發(fā)布時(shí)間:2025/6/15 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 自己动手实现自旋锁(spinlock) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

大多數(shù)的并行程序都需要在底層使用鎖機(jī)制進(jìn)行同步,簡(jiǎn)單來(lái)講,鎖無(wú)非是一套簡(jiǎn)單的原語(yǔ),它們保證程序(或進(jìn)程)對(duì)某一資源的互斥訪問(wèn)來(lái)維持?jǐn)?shù)據(jù)的一致性,如果沒(méi)有鎖機(jī)制作為保證,多個(gè)線程可能同時(shí)訪問(wèn)某一資源,假設(shè)沒(méi)有精心設(shè)計(jì)的(很復(fù)雜)無(wú)鎖算法保證程序正確執(zhí)行,那么后果往往非常嚴(yán)重的。無(wú)鎖算法難于使用,所以一般而言都使用鎖來(lái)保證程序的一致性。

如果更新某一數(shù)據(jù)結(jié)構(gòu)的操作比較緩慢,那么互斥的鎖是一個(gè)比較好的選擇,此時(shí)如果某一進(jìn)程或線程被阻塞,操作系統(tǒng)會(huì)重新接管控制權(quán),并調(diào)度其他進(jìn)程(或線程)繼續(xù)執(zhí)行,原先被阻塞的進(jìn)程處于睡眠狀態(tài)??刂茩?quán)的轉(zhuǎn)換伴隨著進(jìn)程上下文的切換,而這往往是一個(gè)昂貴而耗時(shí)的操作,所以對(duì)于等待鎖的時(shí)間比較短,那么應(yīng)該使用其他更高效的方法。


自旋鎖(spinlock)

自旋鎖(Spinlock)是一種常用的互斥(Mutual Exclusion)同步原語(yǔ)(Synchronization Primitive),試圖進(jìn)入臨界區(qū)(Critical Section)的線程使用忙等待(Busy Waiting)的方式檢測(cè)鎖的狀態(tài),若鎖未被持有則嘗試獲取。與其他鎖不同,自旋鎖僅僅只是“自旋”,即不停地檢查某一鎖是否已經(jīng)被解開(kāi),自旋鎖是非常快的,所以加鎖-解鎖操作耗時(shí)很短,然而,自旋鎖也不是萬(wàn)精油,當(dāng)因互斥導(dǎo)致進(jìn)程睡眠的時(shí)間很長(zhǎng)時(shí),使用自旋鎖是不明智的選擇。

下面我們考慮實(shí)現(xiàn)自己的自旋鎖,首先我們需要一些原語(yǔ),幸好GCC已經(jīng)為我們提供了一些內(nèi)置函數(shù),

#define atomic_xadd(P, V) __sync_fetch_and_add((P), (V))
#define cmpxchg(P, O, N) __sync_val_compare_and_swap((P), (O), (N))
#define atomic_inc(P) __sync_add_and_fetch((P), 1)
#define atomic_dec(P) __sync_add_and_fetch((P), -1)
#define atomic_add(P, V) __sync_add_and_fetch((P), (V))
#define atomic_set_bit(P, V) __sync_or_and_fetch((P), 1<<(V))
#define atomic_clear_bit(P, V) __sync_and_and_fetch((P), ~(1<<(V)))

然而,我們也需要自己實(shí)現(xiàn)其他的幾個(gè)原子操作,如下:

/* Compile read-write barrier */
#define barrier() asm volatile("": : :"memory")

/* Pause instruction to prevent excess processor bus usage */
#define cpu_relax() asm volatile("pause\n": : :"memory")

/* Atomic exchange (of various sizes) */
static inline void *xchg_64(void *ptr, void *x)
{
__asm__ __volatile__("xchgq %0,%1"
:"=r" ((unsigned long long) x)
:"m" (*(volatile long long *)ptr), "0" ((unsigned long long) x)
:"memory");

return x;
}

static inline unsigned xchg_32(void *ptr, unsigned x)
{
__asm__ __volatile__("xchgl %0,%1"
:"=r" ((unsigned) x)
:"m" (*(volatile unsigned *)ptr), "0" (x)
:"memory");

return x;
}

static inline unsigned short xchg_16(void *ptr, unsigned short x)
{
__asm__ __volatile__("xchgw %0,%1"
:"=r" ((unsigned short) x)
:"m" (*(volatile unsigned short *)ptr), "0" (x)
:"memory");

return x;
}

/* Test and set a bit */
static inline char atomic_bitsetandtest(void *ptr, int x)
{
char out;
__asm__ __volatile__("lock; bts %2,%1\n"
"sbb %0,%0\n"
:"=r" (out), "=m" (*(volatile long long *)ptr)
:"Ir" (x)
:"memory");

return out;
}

自旋鎖可以使用交換原語(yǔ)實(shí)現(xiàn),如下:

#define EBUSY 1
typedef unsigned spinlock;

static void spin_lock(spinlock *lock)
{
while (1)
{
if (!xchg_32(lock, EBUSY)) return;

while (*lock) cpu_relax();
}
}

static void spin_unlock(spinlock *lock)
{
barrier();
*lock = 0;
}

static int spin_trylock(spinlock *lock)
{
return xchg_32(lock, EBUSY);
}

上面的自旋鎖已經(jīng)能夠工作,但是也會(huì)產(chǎn)生問(wèn)題,因?yàn)槎鄠€(gè)線程可能產(chǎn)生競(jìng)爭(zhēng),因?yàn)樵阪i釋放的時(shí)候其他的每個(gè)線程都想獲得鎖。這會(huì)導(dǎo)致處理器總線的負(fù)載增大,從而使性能降低,所以接下來(lái)我們將實(shí)現(xiàn)另外一種自旋鎖,該自旋鎖能夠感知下一個(gè)獲得鎖的進(jìn)程或線程,因此能夠大大減輕處理器總線負(fù)載。

下面我們介紹另外一種自旋鎖,MCS自旋鎖,該鎖使用鏈表維護(hù)申請(qǐng)者的請(qǐng)求序列,

typedef struct mcs_lock_t mcs_lock_t;
struct mcs_lock_t
{
mcs_lock_t *next;
int spin;
};
typedef struct mcs_lock_t *mcs_lock;

static void lock_mcs(mcs_lock *m, mcs_lock_t *me)
{
mcs_lock_t *tail;

me->next = NULL;
me->spin = 0;

tail = xchg_64(m, me);

/* No one there? */
if (!tail) return;

/* Someone there, need to link in */
tail->next = me;

/* Make sure we do the above setting of next. */
barrier();

/* Spin on my spin variable */
while (!me->spin) cpu_relax();

return;
}

static void unlock_mcs(mcs_lock *m, mcs_lock_t *me)
{
/* No successor yet? */
if (!me->next)
{
/* Try to atomically unlock */
if (cmpxchg(m, me, NULL) == me) return;

/* Wait for successor to appear */
while (!me->next) cpu_relax();
}

/* Unlock next one */
me->next->spin = 1;
}

static int trylock_mcs(mcs_lock *m, mcs_lock_t *me)
{
mcs_lock_t *tail;

me->next = NULL;
me->spin = 0;

/* Try to lock */
tail = cmpxchg(m, NULL, &me);

/* No one was there - can quickly return */
if (!tail) return 0;

return EBUSY;
}

當(dāng)然,MCS鎖也是有問(wèn)題的,因?yàn)樗腁PI除了需要傳遞鎖的地址外,還需要傳遞另外一個(gè)結(jié)構(gòu),下面介紹另外一種自旋鎖算法,K42鎖算法,

typedef struct k42lock k42lock;
struct k42lock
{
k42lock *next;
k42lock *tail;
};

static void k42_lock(k42lock *l)
{
k42lock me;
k42lock *pred, *succ;
me.next = NULL;

barrier();

pred = xchg_64(&l->tail, &me);
if (pred)
{
me.tail = (void *) 1;

barrier();
pred->next = &me;
barrier();

while (me.tail) cpu_relax();
}

succ = me.next;

if (!succ)
{
barrier();
l->next = NULL;

if (cmpxchg(&l->tail, &me, &l->next) != &me)
{
while (!me.next) cpu_relax();

l->next = me.next;
}
}
else
{
l->next = succ;
}
}


static void k42_unlock(k42lock *l)
{
k42lock *succ = l->next;

barrier();

if (!succ)
{
if (cmpxchg(&l->tail, &l->next, NULL) == (void *) &l->next) return;

while (!l->next) cpu_relax();
succ = l->next;
}

succ->tail = NULL;
}

static int k42_trylock(k42lock *l)
{
if (!cmpxchg(&l->tail, NULL, &l->next)) return 0;

return EBUSY;
}

K42和MCS鎖都需要遍歷鏈表才能找到下一個(gè)最可能獲得鎖的進(jìn)程(或線程),有時(shí)查找可能比較費(fèi)時(shí),所以我們?cè)俅胃倪M(jìn)后:

typedef struct listlock_t listlock_t;
struct listlock_t
{
listlock_t *next;
int spin;
};
typedef struct listlock_t *listlock;

#define LLOCK_FLAG (void *)1

static void listlock_lock(listlock *l)
{
listlock_t me;
listlock_t *tail;

/* Fast path - no users */
if (!cmpxchg(l, NULL, LLOCK_FLAG)) return;

me.next = LLOCK_FLAG;
me.spin = 0;

/* Convert into a wait list */
tail = xchg_64(l, &me);

if (tail)
{
/* Add myself to the list of waiters */
if (tail == LLOCK_FLAG) tail = NULL;
me.next = tail;

/* Wait for being able to go */
while (!me.spin) cpu_relax();

return;
}

/* Try to convert to an exclusive lock */
if (cmpxchg(l, &me, LLOCK_FLAG) == &me) return;

/* Failed - there is now a wait list */
tail = *l;

/* Scan to find who is after me */
while (1)
{
/* Wait for them to enter their next link */
while (tail->next == LLOCK_FLAG) cpu_relax();

if (tail->next == &me)
{
/* Fix their next pointer */
tail->next = NULL;

return;
}

tail = tail->next;
}
}

static void listlock_unlock(listlock *l)
{
listlock_t *tail;
listlock_t *tp;

while (1)
{
tail = *l;

barrier();

/* Fast path */
if (tail == LLOCK_FLAG)
{
if (cmpxchg(l, LLOCK_FLAG, NULL) == LLOCK_FLAG) return;

continue;
}

tp = NULL;

/* Wait for partially added waiter */
while (tail->next == LLOCK_FLAG) cpu_relax();

/* There is a wait list */
if (tail->next) break;

/* Try to convert to a single-waiter lock */
if (cmpxchg(l, tail, LLOCK_FLAG) == tail)
{
/* Unlock */
tail->spin = 1;

return;
}

cpu_relax();
}

/* A long list */
tp = tail;
tail = tail->next;

/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (tail->next == LLOCK_FLAG) cpu_relax();

if (!tail->next) break;

tp = tail;
tail = tail->next;
}

tp->next = NULL;

barrier();

/* Unlock */
tail->spin = 1;
}

static int listlock_trylock(listlock *l)
{
/* Simple part of a spin-lock */
if (!cmpxchg(l, NULL, LLOCK_FLAG)) return 0;

/* Failure! */
return EBUSY;

等等,還可以改進(jìn),可以在自旋鎖里面嵌套一層自旋鎖,

typedef struct bitlistlock_t bitlistlock_t;
struct bitlistlock_t
{
bitlistlock_t *next;
int spin;
};

typedef bitlistlock_t *bitlistlock;

#define BLL_USED ((bitlistlock_t *) -2LL)

static void bitlistlock_lock(bitlistlock *l)
{
bitlistlock_t me;
bitlistlock_t *tail;

/* Grab control of list */
while (atomic_bitsetandtest(l, 0)) cpu_relax();

/* Remove locked bit */
tail = (bitlistlock_t *) ((uintptr_t) *l & ~1LL);

/* Fast path, no waiters */
if (!tail)
{
/* Set to be a flag value */
*l = BLL_USED;
return;
}

if (tail == BLL_USED) tail = NULL;
me.next = tail;
me.spin = 0;

barrier();

/* Unlock, and add myself to the wait list */
*l = &me;

/* Wait for the go-ahead */
while (!me.spin) cpu_relax();
}

static void bitlistlock_unlock(bitlistlock *l)
{
bitlistlock_t *tail;
bitlistlock_t *tp;

/* Fast path - no wait list */
if (cmpxchg(l, BLL_USED, NULL) == BLL_USED) return;

/* Grab control of list */
while (atomic_bitsetandtest(l, 0)) cpu_relax();

tp = *l;

barrier();

/* Get end of list */
tail = (bitlistlock_t *) ((uintptr_t) tp & ~1LL);

/* Actually no users? */
if (tail == BLL_USED)
{
barrier();
*l = NULL;
return;
}

/* Only one entry on wait list? */
if (!tail->next)
{
barrier();

/* Unlock bitlock */
*l = BLL_USED;

barrier();

/* Unlock lock */
tail->spin = 1;

return;
}

barrier();

/* Unlock bitlock */
*l = tail;

barrier();

/* Scan wait list for start */
do
{
tp = tail;
tail = tail->next;
}
while (tail->next);

tp->next = NULL;

barrier();

/* Unlock */
tail->spin = 1;
}

static int bitlistlock_trylock(bitlistlock *l)
{
if (!*l && (cmpxchg(l, NULL, BLL_USED) == NULL)) return 0;

return EBUSY;
}

還可以再次改進(jìn),如下

/* Bit-lock for editing the wait block */
#define SLOCK_LOCK 1
#define SLOCK_LOCK_BIT 0

/* Has an active user */
#define SLOCK_USED 2

#define SLOCK_BITS 3

typedef struct slock slock;
struct slock
{
uintptr_t p;
};

typedef struct slock_wb slock_wb;
struct slock_wb
{
/*
* last points to the last wait block in the chain.
* The value is only valid when read from the first wait block.
*/
slock_wb *last;

/* next points to the next wait block in the chain. */
slock_wb *next;

/* Wake up? */
int wake;
};

/* Wait for control of wait block */
static slock_wb *slockwb(slock *s)
{
uintptr_t p;

/* Spin on the wait block bit lock */
while (atomic_bitsetandtest(&s->p, SLOCK_LOCK_BIT))
{
cpu_relax();
}

p = s->p;

if (p <= SLOCK_BITS)
{
/* Oops, looks like the wait block was removed. */
atomic_dec(&s->p);
return NULL;
}

return (slock_wb *)(p - SLOCK_LOCK);
}

static void slock_lock(slock *s)
{
slock_wb swblock;

/* Fastpath - no other readers or writers */
if (!s->p && (cmpxchg(&s->p, 0, SLOCK_USED) == 0)) return;

/* Initialize wait block */
swblock.next = NULL;
swblock.last = &swblock;
swblock.wake = 0;

while (1)
{
uintptr_t p = s->p;

cpu_relax();

/* Fastpath - no other readers or writers */
if (!p)
{
if (cmpxchg(&s->p, 0, SLOCK_USED) == 0) return;
continue;
}

if (p > SLOCK_BITS)
{
slock_wb *first_wb, *last;

first_wb = slockwb(s);
if (!first_wb) continue;

last = first_wb->last;
last->next = &swblock;
first_wb->last = &swblock;

/* Unlock */
barrier();
s->p &= ~SLOCK_LOCK;

break;
}

/* Try to add the first wait block */
if (cmpxchg(&s->p, p, (uintptr_t)&swblock) == p) break;
}

/* Wait to acquire exclusive lock */
while (!swblock.wake) cpu_relax();
}


static void slock_unlock(slock *s)
{
slock_wb *next;
slock_wb *wb;
uintptr_t np;

while (1)
{
uintptr_t p = s->p;

/* This is the fast path, we can simply clear the SRWLOCK_USED bit. */
if (p == SLOCK_USED)
{
if (cmpxchg(&s->p, SLOCK_USED, 0) == SLOCK_USED) return;
continue;
}

/* There's a wait block, we need to wake the next pending user */
wb = slockwb(s);
if (wb) break;

cpu_relax();
}

next = wb->next;
if (next)
{
/*
* There's more blocks chained, we need to update the pointers
* in the next wait block and update the wait block pointer.
*/
np = (uintptr_t) next;

next->last = wb->last;
}
else
{
/* Convert the lock to a simple lock. */
np = SLOCK_USED;
}

barrier();
/* Also unlocks lock bit */
s->p = np;
barrier();

/* Notify the next waiter */
wb->wake = 1;

/* We released the lock */
}

static int slock_trylock(slock *s)
{
/* No other readers or writers? */
if (!s->p && (cmpxchg(&s->p, 0, SLOCK_USED) == 0)) return 0;

return EBUSY;
}

下面是另外一種實(shí)現(xiàn)方式,稱為stack-lock算法,

typedef struct stlock_t stlock_t;
struct stlock_t
{
stlock_t *next;
};

typedef struct stlock_t *stlock;

static __attribute__((noinline)) void stlock_lock(stlock *l)
{
stlock_t *me = NULL;

barrier();
me = xchg_64(l, &me);

/* Wait until we get the lock */
while (me) cpu_relax();
}

#define MAX_STACK_SIZE (1<<12)

static __attribute__((noinline)) int on_stack(void *p)
{
int x;

uintptr_t u = (uintptr_t) &x;

return ((u - (uintptr_t)p + MAX_STACK_SIZE) < MAX_STACK_SIZE * 2);
}

static __attribute__((noinline)) void stlock_unlock(stlock *l)
{
stlock_t *tail = *l;
barrier();

/* Fast case */
if (on_stack(tail))
{
/* Try to remove the wait list */
if (cmpxchg(l, tail, NULL) == tail) return;

tail = *l;
}

/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (!tail->next) cpu_relax();

if (on_stack(tail->next)) break;

tail = tail->next;
}

barrier();

/* Unlock */
tail->next = NULL;
}

static int stlock_trylock(stlock *l)
{
stlock_t me;

if (!cmpxchg(l, NULL, &me)) return 0;

return EBUSY;
}

改進(jìn)后變成,

typedef struct plock_t plock_t;
struct plock_t
{
plock_t *next;
};

typedef struct plock plock;
struct plock
{
plock_t *next;
plock_t *prev;
plock_t *last;
};

static void plock_lock(plock *l)
{
plock_t *me = NULL;
plock_t *prev;

barrier();
me = xchg_64(l, &me);

prev = NULL;

/* Wait until we get the lock */
while (me)
{
/* Scan wait list for my previous */
if (l->next != (plock_t *) &me)
{
plock_t *t = l->next;

while (me)
{
if (t->next == (plock_t *) &me)
{
prev = t;

while (me) cpu_relax();

goto done;
}

if (t->next) t = t->next;
cpu_relax();
}
}
cpu_relax();
}

done:
l->prev = prev;
l->last = (plock_t *) &me;
}

static void plock_unlock(plock *l)
{
plock_t *tail;

/* Do I know my previous? */
if (l->prev)
{
/* Unlock */
l->prev->next = NULL;
return;
}

tail = l->next;
barrier();

/* Fast case */
if (tail == l->last)
{
/* Try to remove the wait list */
if (cmpxchg(&l->next, tail, NULL) == tail) return;

tail = l->next;
}

/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (!tail->next) cpu_relax();

if (tail->next == l->last) break;

tail = tail->next;
}

barrier();

/* Unlock */
tail->next = NULL;
}

static int plock_trylock(plock *l)
{
plock_t me;

if (!cmpxchg(&l->next, NULL, &me))
{
l->last = &me;
return 0;
}

return EBUSY;
}

下面介紹另外一種算法,ticket lock算法,實(shí)際上,Linux內(nèi)核正是采用了該算法,不過(guò)考慮到執(zhí)行效率,人家是以匯編形式寫(xiě)的,

typedef union ticketlock ticketlock;

union ticketlock
{
unsigned u;
struct
{
unsigned short ticket;
unsigned short users;
} s;
};

static void ticket_lock(ticketlock *t)
{
unsigned short me = atomic_xadd(&t->s.users, 1);

while (t->s.ticket != me) cpu_relax();
}

static void ticket_unlock(ticketlock *t)
{
barrier();
t->s.ticket++;
}

static int ticket_trylock(ticketlock *t)
{
unsigned short me = t->s.users;
unsigned short menew = me + 1;
unsigned cmp = ((unsigned) me << 16) + me;
unsigned cmpnew = ((unsigned) menew << 16) + me;

if (cmpxchg(&t->u, cmp, cmpnew) == cmp) return 0;

return EBUSY;
}

static int ticket_lockable(ticketlock *t)
{
ticketlock u = *t;
barrier();
return (u.s.ticket == u.s.users);
}

至此,自旋鎖各種不同的實(shí)現(xiàn)介紹完畢,親,你明白了嗎?:)

(全文完)




總結(jié)

以上是生活随笔為你收集整理的自己动手实现自旋锁(spinlock)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。