linux內(nèi)核級(jí)同步機(jī)制--futex

在面試中關(guān)于多線程同步,你必須要思考的問題 一文中,我們知道glibc的pthread_cond_timedwait底層是用linux futex機(jī)制實(shí)現(xiàn)的。

創(chuàng)新互聯(lián)長期為上千多家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺(tái),與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為陽西企業(yè)提供專業(yè)的網(wǎng)站設(shè)計(jì)制作、成都網(wǎng)站制作,陽西網(wǎng)站改版等技術(shù)服務(wù)。擁有十載豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。

理想的同步機(jī)制應(yīng)該是沒有鎖沖突時(shí)在用戶態(tài)利用原子指令就解決問題,而需要掛起等待時(shí)再使用內(nèi)核提供的系統(tǒng)調(diào)用進(jìn)行睡眠與喚醒。換句話說,在用戶態(tài)的自旋失敗時(shí),能不能讓進(jìn)程掛起,由持有鎖的線程釋放鎖時(shí)將其喚醒?
如果你沒有較深入地考慮過這個(gè)問題,很可能想當(dāng)然的認(rèn)為類似于這樣就行了(偽代碼):

void?lock(int?lockval)?{	//trylock是用戶級(jí)的自旋鎖
	while(!trylock(lockval))?{
		wait();//釋放cpu,并將當(dāng)期線程加入等待隊(duì)列,是系統(tǒng)調(diào)用
	}
}boolean?trylock(int?lockval){	int?i=0;?
	//localval=1代表上鎖成功
	while(!compareAndSet(lockval,0,1)){		if(++i>10){			return?false;
		}
	}	return?true;
}void?unlock(int?lockval)?{
	?compareAndSet(lockval,1,0);
	?notify();
}

上述代碼的問題是trylock和wait兩個(gè)調(diào)用之間存在一個(gè)窗口:
如果一個(gè)線程trylock失敗,在調(diào)用wait時(shí)持有鎖的線程釋放了鎖,當(dāng)前線程還是會(huì)調(diào)用wait進(jìn)行等待,但之后就沒有人再喚醒該線程了。

為了解決上述問題,linux內(nèi)核引入了futex機(jī)制,futex主要包括等待和喚醒兩個(gè)方法:futex_waitfutex_wake,其定義如下

//uaddr指向一個(gè)地址,val代表這個(gè)地址期待的值,當(dāng)*uaddr==val時(shí),才會(huì)進(jìn)行waitint?futex_wait(int?*uaddr,?int?val);//喚醒n個(gè)在uaddr指向的鎖變量上掛起等待的進(jìn)程int?futex_wake(int?*uaddr,?int?n);

futex在真正將進(jìn)程掛起之前會(huì)檢查addr指向的地址的值是否等于val,如果不相等則會(huì)立即返回,由用戶態(tài)繼續(xù)trylock。否則將當(dāng)期線程插入到一個(gè)隊(duì)列中去,并掛起。

在關(guān)于同步的一點(diǎn)思考-上文章中對(duì)futex的背景與基本原理有介紹,對(duì)futex不熟悉的人可以先看下。

本文將深入分析futex的實(shí)現(xiàn),讓讀者對(duì)于鎖的最底層實(shí)現(xiàn)方式有直觀認(rèn)識(shí),再結(jié)合之前的兩篇文章(關(guān)于同步的一點(diǎn)思考-上和關(guān)于同步的一點(diǎn)思考-下)能對(duì)操作系統(tǒng)的同步機(jī)制有個(gè)全面的理解。

下文中的進(jìn)程一詞包括常規(guī)進(jìn)程與線程。

futex_wait

在看下面的源碼分析前,先思考一個(gè)問題:如何確保掛起進(jìn)程時(shí),val的值是沒有被其他進(jìn)程修改過的?

代碼在kernel/futex.c中

static?int?futex_wait(u32?__user?*uaddr,?int?fshared,
		??????u32?val,?ktime_t?*abs_time,?u32?bitset,?int?clockrt){	struct?hrtimer_sleeper?timeout,?*to?=?NULL;
	struct?restart_block?*restart;
	struct?futex_hash_bucket?*hb;
	struct?futex_q?q;
	int?ret;

	...	//設(shè)置hrtimer定時(shí)任務(wù):在一定時(shí)間(abs_time)后,如果進(jìn)程還沒被喚醒則喚醒wait的進(jìn)程
	if?(abs_time)?{
	????...
		hrtimer_init_sleeper(to,?current);
		...
	}

retry:	//該函數(shù)中判斷uaddr指向的值是否等于val,以及一些初始化操作
	ret?=?futex_wait_setup(uaddr,?val,?fshared,?&q,?&hb);	//如果val發(fā)生了改變,則直接返回
	if?(ret)		goto?out;	//將當(dāng)前進(jìn)程狀態(tài)改為TASK_INTERRUPTIBLE,并插入到futex等待隊(duì)列,然后重新調(diào)度。
	futex_wait_queue_me(hb,?&q,?to);	/*?If?we?were?woken?(and?unqueued),?we?succeeded,?whatever.?*/
	ret?=?0;	//如果unqueue_me成功,則說明是超時(shí)觸發(fā)(因?yàn)閒utex_wake喚醒時(shí),會(huì)將該進(jìn)程移出等待隊(duì)列,所以這里會(huì)失敗)
	if?(!unqueue_me(&q))		goto?out_put_key;
	ret?=?-ETIMEDOUT;	if?(to?&&?!to->task)		goto?out_put_key;	/*
	?*?We?expect?signal_pending(current),?but?we?might?be?the
	?*?victim?of?a?spurious?wakeup?as?well.
	?*/
	if?(!signal_pending(current))?{
		put_futex_key(fshared,?&q.key);		goto?retry;
	}

	ret?=?-ERESTARTSYS;	if?(!abs_time)		goto?out_put_key;

	...

out_put_key:
	put_futex_key(fshared,?&q.key);
out:	if?(to)?{		//取消定時(shí)任務(wù)
		hrtimer_cancel(&to->timer);
		destroy_hrtimer_on_stack(&to->timer);
	}	return?ret;
}

在將進(jìn)程阻塞前會(huì)將當(dāng)期進(jìn)程插入到一個(gè)等待隊(duì)列中,需要注意的是這里說的等待隊(duì)列其實(shí)是一個(gè)類似Java HashMap的結(jié)構(gòu),全局唯一。

struct?futex_hash_bucket?{
	spinlock_t?lock;	//雙向鏈表
	struct?plist_head?chain;};static?struct?futex_hash_bucket?futex_queues[1<<FUTEX_HASHBITS];

著重看futex_wait_setup和兩個(gè)函數(shù)futex_wait_queue_me

static?int?futex_wait_setup(u32?__user?*uaddr,?u32?val,?int?fshared,			???struct?futex_q?*q,?struct?futex_hash_bucket?**hb){
	u32?uval;	int?ret;
retry:
	q->key?=?FUTEX_KEY_INIT;	//初始化futex_q
	ret?=?get_futex_key(uaddr,?fshared,?&q->key,?VERIFY_READ);	if?(unlikely(ret?!=?0))		return?ret;

retry_private:	//獲得自旋鎖
	*hb?=?queue_lock(q);	//原子的將uaddr的值設(shè)置到uval中
	ret?=?get_futex_value_locked(&uval,?uaddr);

???...?
	//如果當(dāng)期uaddr指向的值不等于val,即說明其他進(jìn)程修改了
	//uaddr指向的值,等待條件不再成立,不用阻塞直接返回。
	if?(uval?!=?val)?{		//釋放鎖
		queue_unlock(q,?*hb);
		ret?=?-EWOULDBLOCK;
	}

???...	return?ret;
}

函數(shù)futex_wait_setup中主要做了兩件事,一是獲得自旋鎖,二是判斷*uaddr是否為預(yù)期值。

static?void?futex_wait_queue_me(struct?futex_hash_bucket?*hb,?struct?futex_q?*q,
				struct?hrtimer_sleeper?*timeout)
{	//設(shè)置進(jìn)程狀態(tài)為TASK_INTERRUPTIBLE,cpu調(diào)度時(shí)只會(huì)選擇
	//狀態(tài)為TASK_RUNNING的進(jìn)程
	set_current_state(TASK_INTERRUPTIBLE);	//將當(dāng)期進(jìn)程(q封裝)插入到等待隊(duì)列中去,然后釋放自旋鎖
	queue_me(q,?hb);	//啟動(dòng)定時(shí)任務(wù)
	if?(timeout)?{
		hrtimer_start_expires(&timeout->timer,?HRTIMER_MODE_ABS);		if?(!hrtimer_active(&timeout->timer))
			timeout->task?=?NULL;
	}	/*
	?*?If?we?have?been?removed?from?the?hash?list,?then?another?task
	?*?has?tried?to?wake?us,?and?we?can?skip?the?call?to?schedule().
	?*/
	if?(likely(!plist_node_empty(&q->list)))?{		?
		?//如果沒有設(shè)置過期時(shí)間?||?設(shè)置了過期時(shí)間且還沒過期
		if?(!timeout?||?timeout->task)			//系統(tǒng)重新進(jìn)行進(jìn)程調(diào)度,這個(gè)時(shí)候cpu會(huì)去執(zhí)行其他進(jìn)程,該進(jìn)程會(huì)阻塞在這里
			schedule();
	}	//走到這里說明又被cpu選中運(yùn)行了
	__set_current_state(TASK_RUNNING);
}

futex_wait_queue_me中主要做幾件事:

  1. 將當(dāng)期進(jìn)程插入到等待隊(duì)列

  2. 啟動(dòng)定時(shí)任務(wù)

  3. 重新調(diào)度進(jìn)程

如何保證條件與等待之間的原子性

futex_wait_setup方法中會(huì)加自旋鎖;在futex_wait_queue_me中將狀態(tài)設(shè)置為TASK_INTERRUPTIBLE,調(diào)用queue_me將當(dāng)期線程插入到等待隊(duì)列中,然后才釋放自旋鎖。也就是說檢查uaddr的值的過程跟進(jìn)程掛起的過程放在同一個(gè)臨界區(qū)中。當(dāng)釋放自旋鎖后,這時(shí)再更改addr地址的值已經(jīng)沒有關(guān)系了,因?yàn)楫?dāng)期進(jìn)程已經(jīng)加入到等待隊(duì)列中,能被wake喚醒,不會(huì)出現(xiàn)本文開頭提到的沒人喚醒的問題。

futex_wait小結(jié)

總結(jié)下futex_wait流程:

  1. 加自旋鎖

  2. 檢測*uaddr是否等于val,如果不相等則會(huì)立即返回

  3. 將進(jìn)程狀態(tài)設(shè)置為TASK_INTERRUPTIBLE

  4. 將當(dāng)期進(jìn)程插入到等待隊(duì)列中

  5. 釋放自旋鎖

  6. 創(chuàng)建定時(shí)任務(wù):當(dāng)超過一定時(shí)間還沒被喚醒時(shí),將進(jìn)程喚醒

  7. 掛起當(dāng)前進(jìn)程

futex_wake

futex_wake

static?int?futex_wake(u32?__user?*uaddr,?int?fshared,?int?nr_wake,?u32?bitset){	struct?futex_hash_bucket?*hb;
	struct?futex_q?*this,?*next;
	struct?plist_head?*head;
	union?futex_key?key?=?FUTEX_KEY_INIT;	int?ret;

	...	//根據(jù)uaddr的值填充&key的內(nèi)容
	ret?=?get_futex_key(uaddr,?fshared,?&key,?VERIFY_READ);	if?(unlikely(ret?!=?0))		goto?out;	//根據(jù)&key獲得對(duì)應(yīng)uaddr所在的futex_hash_bucket
	hb?=?hash_futex(&key);	//對(duì)該hb加自旋鎖
	spin_lock(&hb->lock);
	head?=?&hb->chain;	//遍歷該hb的鏈表,注意鏈表中存儲(chǔ)的節(jié)點(diǎn)是plist_node類型,而而這里的this卻是futex_q類型,這種類型轉(zhuǎn)換是通過c中的container_of機(jī)制實(shí)現(xiàn)的
	plist_for_each_entry_safe(this,?next,?head,?list)?{		if?(match_futex?(&this->key,?&key))?{
			...			//喚醒對(duì)應(yīng)進(jìn)程
			wake_futex(this);			if?(++ret?>=?nr_wake)				break;
		}
	}	//釋放自旋鎖
	spin_unlock(&hb->lock);
	put_futex_key(fshared,?&key);
out:	return?ret;
}

futex_wake流程如下:

  1. 找到uaddr對(duì)應(yīng)的futex_hash_bucket,即代碼中的hb

  2. 對(duì)hb加自旋鎖

  3. 遍歷fb的鏈表,找到uaddr對(duì)應(yīng)的節(jié)點(diǎn)

  4. 調(diào)用wake_futex喚起等待的進(jìn)程

  5. 釋放自旋鎖

wake_futex中將制定進(jìn)程狀態(tài)設(shè)置為TASK_RUNNING并加入到系統(tǒng)調(diào)度列表中,同時(shí)將進(jìn)程從futex的等待隊(duì)列中移除掉,具體代碼就不分析了,有興趣的可以自行研究。

End

Java中的ReentrantLock,Object.wait和Thread.sleep等等底層都是用futex進(jìn)行線程同步,理解futex的實(shí)現(xiàn)能幫助你更好的理解與使用這些上層的同步機(jī)制。另外因篇幅與精力有限,涉及到進(jìn)程調(diào)度的相關(guān)內(nèi)容沒有具體分析,不過并不妨礙理解文章內(nèi)容。

網(wǎng)頁名稱:linux內(nèi)核級(jí)同步機(jī)制--futex
分享鏈接:http://www.muchs.cn/article36/jpcdsg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制開發(fā)、外貿(mào)網(wǎng)站建設(shè)軟件開發(fā)、網(wǎng)站內(nèi)鏈網(wǎng)頁設(shè)計(jì)公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)