php 多线程处理redis,redis的多线程
目錄
先說明下redis也是多線程的.但是redis的主線程處理業(yè)務(wù).而其他三個線程跟主要功能是關(guān)系不到的
redis的三個線程主要是做什么
初始化入口void initServer(void) {
...
bioInit();
...
}
初始化后redis其他后臺線程.void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
/* Initialization of state vars and objects
*
* 初始化 job 隊列,以及線程狀態(tài)
*/
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_condvar[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
/* Set the stack size as by default it may be small in some system
*
* 設(shè)置棧大小
*/
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of.
*
* 創(chuàng)建線程
*/
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
初始化三類線程. 這三類線程被認(rèn)為是后臺執(zhí)行.不影響主線程BIO_CLOSE_FILE . 關(guān)閉重寫之前的aof文件.
BIO_AOF_FSYNC . 定時刷新數(shù)據(jù)到磁盤上.
BIO_LAZY_FREE . 惰性刪除過期時間數(shù)據(jù)
redis為了保證其高效.一些比較耗時的動作會起線程或者進(jìn)程來完成.不會阻塞在業(yè)務(wù)主線程上.
使用多線程的特點(diǎn)創(chuàng)建3個線程.這個三個線程的功能互不影響
每個線程都有一個工作隊列.主線程生產(chǎn)任務(wù)放到任務(wù)隊里.這三個線程消費(fèi)這些任務(wù).
任務(wù)隊列和取出消費(fèi)的時候都得加鎖.防止競爭
使用條件變量來等待任務(wù).以及通知// 存放工作的隊列
static list *bio_jobs[REDIS_BIO_NUM_OPS];
bio_jobs是一個雙端鏈表結(jié)構(gòu)void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]);
// 將新工作推入隊列
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
pthread_cond_signal(&bio_condvar[type]);
pthread_mutex_unlock(&bio_mutex[type]);
}
當(dāng)有任務(wù)的時候.先把任務(wù)丟到redis工作隊列里.這里記得加鎖void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
unsigned long type = (unsigned long) arg;
sigset_t sigset;
/* Make the thread killable at any time, so that bioKillThreads()
* can work reliably. */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
pthread_mutex_lock(&bio_mutex[type]);
/* Block SIGALRM so we are sure that only the main thread will
* receive the watchdog signal. */
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
redisLog(REDIS_WARNING,
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
continue;
}
/* Pop the job from the queue.
*
* 取出(但不刪除)隊列中的首個任務(wù)
*/
ln = listFirst(bio_jobs[type]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
pthread_mutex_unlock(&bio_mutex[type]);
/* Process the job accordingly to its type. */
// 執(zhí)行任務(wù)
if (type == REDIS_BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == REDIS_BIO_AOF_FSYNC) {
aof_fsync((long)job->arg1);
} else {
redisPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
/* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
pthread_mutex_lock(&bio_mutex[type]);
// 將執(zhí)行完成的任務(wù)從隊列中刪除,并減少任務(wù)計數(shù)器
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
}
}操作前先上鎖
從工作任務(wù)里取任務(wù)
解鎖
執(zhí)行業(yè)務(wù)邏輯
執(zhí)行完上鎖.重新pthread_cond_wait
條件變量
條件變量是利用線程間共享的全局變量進(jìn)行同步的一種機(jī)制,主要包括兩個動作:一個線程等待"條件變量的條件成立"而掛起;
另一個線程使"條件成立"(給出條件成立信號)。
==為了防止競爭,條件變量的使用總是和一個互斥鎖結(jié)合在一起==
pthread_cond_wait原理
就是說pthread_cond_wait(pthread_cond_t cond, pthread_mutex_tmutex)函數(shù)傳入的參數(shù)mutex用于保護(hù)條件,因為我們在調(diào)用pthread_cond_wait時,如果條件不成立我們就進(jìn)入阻塞,但是進(jìn)入阻塞這個期間,如果條件變量改變了的話,那我們就漏掉了這個條件。因為這個線程還沒有放到等待隊列上,所以調(diào)用pthread_cond_wait前要先鎖互斥量,即調(diào)用pthread_mutex_lock()。
==pthread_cond_wait在把線程放進(jìn)阻塞隊列后,自動對mutex進(jìn)行解鎖,使得其它線程可以獲得加鎖的權(quán)利。這樣其它線程才能對臨界資源進(jìn)行訪問并在適當(dāng)?shù)臅r候喚醒這個阻塞的進(jìn)程。當(dāng)pthread_cond_wait返回的時候又自動給mutex加鎖==
總結(jié)
以上是生活随笔為你收集整理的php 多线程处理redis,redis的多线程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 哈密尔顿算法matlab,复杂制造过程最
- 下一篇: php 打乱数组顺序_PHP实现大转盘抽