Go語(yǔ)言基于信號(hào)搶占式調(diào)度的示例分析

這篇文章將為大家詳細(xì)講解有關(guān)Go語(yǔ)言基于信號(hào)搶占式調(diào)度的示例分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

創(chuàng)新互聯(lián)公司專注于奎屯網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供奎屯營(yíng)銷型網(wǎng)站建設(shè),奎屯網(wǎng)站制作、奎屯網(wǎng)頁(yè)設(shè)計(jì)、奎屯網(wǎng)站官網(wǎng)定制、小程序開發(fā)服務(wù),打造奎屯網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供奎屯網(wǎng)站排名全網(wǎng)營(yíng)銷落地服務(wù)。

介紹

在 Go 的 1.14 版本之前搶占試調(diào)度都是基于協(xié)作的,需要自己主動(dòng)的讓出執(zhí)行,但是這樣是無(wú)法處理一些無(wú)法被搶占的邊緣情況。例如:for 循環(huán)或者垃圾回收長(zhǎng)時(shí)間占用線程,這些問題中的一部分直到 1.14 才被基于信號(hào)的搶占式調(diào)度解決。

下面我們通過一個(gè)例子來(lái)驗(yàn)證一下1.14 版本和 1.13 版本之間的搶占差異:

package main

import (
	"fmt"
	"os"
	"runtime"
	"runtime/trace"
	"sync"
)

func main() {
	runtime.GOMAXPROCS(1)
	f, _ := os.Create("trace.output")
	defer f.Close()
	_ = trace.Start(f)
	defer trace.Stop()
	var wg sync.WaitGroup
	for i := 0; i < 30; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			t := 0
			for i:=0;i<1e8;i++ {
				t+=2
			}
			fmt.Println("total:", t)
		}()
	}
	wg.Wait()
}

這個(gè)例子中會(huì)通過 go trace 來(lái)進(jìn)行執(zhí)行過程的調(diào)用跟蹤。在代碼中指定 runtime.GOMAXPROCS(1)設(shè)置最大的可同時(shí)使用的 CPU 核數(shù)為1,只用一個(gè) P(處理器),這樣就確保是單處理器的場(chǎng)景。然后調(diào)用一個(gè) for 循環(huán)開啟 10 個(gè) goroutines 來(lái)執(zhí)行 func 函數(shù),這是一個(gè)純計(jì)算且耗時(shí)的函數(shù),防止 goroutines 空閑讓出執(zhí)行。

下面我們編譯程序分析 trace 輸出:

$ go build -gcflags "-N -l" main.go 
-N表示禁用優(yōu)化
-l禁用內(nèi)聯(lián)

$ ./main

然后我們獲取到 trace.output 文件后進(jìn)行可視化展示:

$ go tool trace -http=":6060" ./trace.output

Go1.13 trace 分析

Go語(yǔ)言基于信號(hào)搶占式調(diào)度的示例分析

從上面的這個(gè)圖可以看出:

  1. 因?yàn)槲覀兿薅酥挥幸粋€(gè) P,所以在 PROCS 這一欄里面只有一個(gè) Proc0;

  2. 我們?cè)?for 循環(huán)里面啟動(dòng)了 30 個(gè) goroutines ,所以我們可以數(shù)一下 Proc0 里面的顏色框框,剛好30 個(gè);

  3. 30 個(gè) goroutines 在 Proc0 里面是串行執(zhí)行的,一個(gè)執(zhí)行完再執(zhí)行另一個(gè),沒有進(jìn)行搶占;

  4. 隨便點(diǎn)擊一個(gè) goroutines 的詳情欄可以看到 Wall Duration 為 0.23s 左右,表示這個(gè) goroutines 持續(xù)執(zhí)行了 0.23s,總共 10 個(gè) goroutines 執(zhí)行時(shí)間是 7s 左右;

  5. 切入調(diào)用棧 Start Stack Trace 是 main.main.func1:20,在代碼上面是 func 函數(shù)執(zhí)行頭: go func() ;

  6. 切走調(diào)用棧 End Stack Trace 是 main.main.func1:26,在代碼上是 func 函數(shù)最后執(zhí)行打?。?code>fmt.Println("total:", t);

從上面的 trace 分析可以知道,Go 的協(xié)作式調(diào)度對(duì) calcSum 函數(shù)是毫無(wú)作用的,一旦執(zhí)行開始,只能等執(zhí)行結(jié)束。每個(gè) goroutine 耗費(fèi)了 0.23s 這么長(zhǎng)的時(shí)間,也無(wú)法搶占它的執(zhí)行權(quán)。

Go 1.14 以上 trace 分析

Go語(yǔ)言基于信號(hào)搶占式調(diào)度的示例分析

在 Go 1.14 之后引入了基于信號(hào)的搶占式調(diào)度,從上面的圖可以看到 Proc0 這一欄中密密麻麻都是 goroutines 在切換時(shí)的調(diào)用情況,不會(huì)再出現(xiàn) goroutines 一旦執(zhí)行開始,只能等執(zhí)行結(jié)束這種情況。

上面跑動(dòng)的時(shí)間是 4s 左右這個(gè)情況可以忽略,因?yàn)槲沂窃趦膳_(tái)配置不同的機(jī)器上跑的(主要是我閑麻煩要找兩臺(tái)一樣的機(jī)器)。

下面我們拉近了看一下明細(xì)情況:

Go語(yǔ)言基于信號(hào)搶占式調(diào)度的示例分析

通過這個(gè)明細(xì)可以看出:

  1. 這個(gè) goroutine 運(yùn)行了 0.025s 就讓出執(zhí)行了;

  2. 切入調(diào)用棧 Start Stack Trace 是 main.main.func1:21,和上面一樣;

  3. 切走調(diào)用棧 End Stack Trace 是 runtime.asyncPreempt:50 ,這個(gè)函數(shù)是收到搶占信號(hào)時(shí)執(zhí)行的函數(shù),從這個(gè)地方也能明確的知道,被異步搶占了;

分析

搶占信號(hào)的安裝

runtime/signal_unix.go

程序啟動(dòng)時(shí),在runtime.sighandler中注冊(cè) SIGURG 信號(hào)的處理函數(shù)runtime.doSigPreempt。

initsig

func initsig(preinit bool) {
	// 預(yù)初始化
	if !preinit { 
		signalsOK = true
	} 
	//遍歷信號(hào)數(shù)組
	for i := uint32(0); i < _NSIG; i++ {
		t := &sigtable[i]
		//略過信號(hào):SIGKILL、SIGSTOP、SIGTSTP、SIGCONT、SIGTTIN、SIGTTOU
		if t.flags == 0 || t.flags&_SigDefault != 0 {
			continue
		} 
		...  
		setsig(i, funcPC(sighandler))
	}
}

在 initsig 函數(shù)里面會(huì)遍歷所有的信號(hào)量,然后調(diào)用 setsig 函數(shù)進(jìn)行注冊(cè)。我們可以查看 sigtable 這個(gè)全局變量看看有什么信息:

var sigtable = [...]sigTabT{
	/* 0 */ {0, "SIGNONE: no trap"},
	/* 1 */ {_SigNotify + _SigKill, "SIGHUP: terminal line hangup"},
	/* 2 */ {_SigNotify + _SigKill, "SIGINT: interrupt"},
	/* 3 */ {_SigNotify + _SigThrow, "SIGQUIT: quit"},
	/* 4 */ {_SigThrow + _SigUnblock, "SIGILL: illegal instruction"},
	/* 5 */ {_SigThrow + _SigUnblock, "SIGTRAP: trace trap"},
	/* 6 */ {_SigNotify + _SigThrow, "SIGABRT: abort"},
	/* 7 */ {_SigPanic + _SigUnblock, "SIGBUS: bus error"},
	/* 8 */ {_SigPanic + _SigUnblock, "SIGFPE: floating-point exception"},
	/* 9 */ {0, "SIGKILL: kill"},
	/* 10 */ {_SigNotify, "SIGUSR1: user-defined signal 1"},
	/* 11 */ {_SigPanic + _SigUnblock, "SIGSEGV: segmentation violation"},
	/* 12 */ {_SigNotify, "SIGUSR2: user-defined signal 2"},
	/* 13 */ {_SigNotify, "SIGPIPE: write to broken pipe"},
	/* 14 */ {_SigNotify, "SIGALRM: alarm clock"},
	/* 15 */ {_SigNotify + _SigKill, "SIGTERM: termination"},
	/* 16 */ {_SigThrow + _SigUnblock, "SIGSTKFLT: stack fault"},
	/* 17 */ {_SigNotify + _SigUnblock + _SigIgn, "SIGCHLD: child status has changed"},
	/* 18 */ {_SigNotify + _SigDefault + _SigIgn, "SIGCONT: continue"},
	/* 19 */ {0, "SIGSTOP: stop, unblockable"},
	/* 20 */ {_SigNotify + _SigDefault + _SigIgn, "SIGTSTP: keyboard stop"},
	/* 21 */ {_SigNotify + _SigDefault + _SigIgn, "SIGTTIN: background read from tty"},
	/* 22 */ {_SigNotify + _SigDefault + _SigIgn, "SIGTTOU: background write to tty"},
  				 
	/* 23 */ {_SigNotify + _SigIgn, "SIGURG: urgent condition on socket"},
	/* 24 */ {_SigNotify, "SIGXCPU: cpu limit exceeded"},
	/* 25 */ {_SigNotify, "SIGXFSZ: file size limit exceeded"},
	/* 26 */ {_SigNotify, "SIGVTALRM: virtual alarm clock"},
	/* 27 */ {_SigNotify + _SigUnblock, "SIGPROF: profiling alarm clock"},
	/* 28 */ {_SigNotify + _SigIgn, "SIGWINCH: window size change"},
	/* 29 */ {_SigNotify, "SIGIO: i/o now possible"},
	/* 30 */ {_SigNotify, "SIGPWR: power failure restart"},
	/* 31 */ {_SigThrow, "SIGSYS: bad system call"},
	/* 32 */ {_SigSetStack + _SigUnblock, "signal 32"}, /* SIGCANCEL; see issue 6997 */
	/* 33 */ {_SigSetStack + _SigUnblock, "signal 33"}, /* SIGSETXID; see issues 3871, 9400, 12498 */
	...
}

具體的信號(hào)含義可以看這個(gè)介紹:Unix信號(hào) https://zh.wikipedia.org/wiki/Unix信號(hào)。需要注意的是,搶占信號(hào)在這里是 _SigNotify + _SigIgn 如下:

{_SigNotify + _SigIgn, "SIGURG: urgent condition on socket"}

下面我們看一下 setsig 函數(shù),這個(gè)函數(shù)是在 runtime/os_linux.go文件里面:

setsig

func setsig(i uint32, fn uintptr) {
	var sa sigactiont
	sa.sa_flags = _SA_SIGINFO | _SA_ONSTACK | _SA_RESTORER | _SA_RESTART
	sigfillset(&sa.sa_mask)
	...
	if fn == funcPC(sighandler) {
        // CGO 相關(guān)
		if iscgo {
			fn = funcPC(cgoSigtramp)
		} else {
            // 替換為調(diào)用 sigtramp
			fn = funcPC(sigtramp)
		}
	}
	sa.sa_handler = fn
	sigaction(i, &sa, nil)
}

這里需要注意的是,當(dāng) fn 等于 sighandler 的時(shí)候,調(diào)用的函數(shù)會(huì)被替換成 sigtramp。sigaction 函數(shù)在 Linux 下會(huì)調(diào)用系統(tǒng)調(diào)用函數(shù) sys_signal 以及 sys_rt_sigaction 實(shí)現(xiàn)安裝信號(hào)。

執(zhí)行搶占信號(hào)

到了這里是信號(hào)發(fā)生的時(shí)候進(jìn)行信號(hào)的處理,原本應(yīng)該是在發(fā)送搶占信號(hào)之后,但是這里我先順著安裝信號(hào)往下先講了。大家可以跳到發(fā)送搶占信號(hào)后再回來(lái)。

上面分析可以看到當(dāng) fn 等于 sighandler 的時(shí)候,調(diào)用的函數(shù)會(huì)被替換成 sigtramp,sigtramp是匯編實(shí)現(xiàn),下面我們看看。

src/runtime/sys_linux_amd64.s:

TEXT runtime·sigtramp<ABIInternal>(SB),NOSPLIT,$72
	...
	// We don't save mxcsr or the x87 control word because sigtrampgo doesn't
	// modify them.

	MOVQ	DX, ctx-56(SP)
	MOVQ	SI, info-64(SP)
	MOVQ	DI, signum-72(SP)
	MOVQ	$runtime·sigtrampgo(SB), AX
	CALL AX

	...
	RET

這里會(huì)被調(diào)用說(shuō)明信號(hào)已經(jīng)發(fā)送響應(yīng)了,runtime·sigtramp會(huì)進(jìn)行信號(hào)的處理。runtime·sigtramp會(huì)繼續(xù)調(diào)用 runtime·sigtrampgo 。

這個(gè)函數(shù)在 runtime/signal_unix.go文件中:

sigtrampgo&sighandler

func sigtrampgo(sig uint32, info *siginfo, ctx unsafe.Pointer) {
	if sigfwdgo(sig, info, ctx) {
		return
	}
	c := &sigctxt{info, ctx}
	g := sigFetchG(c)
	... 
	sighandler(sig, info, ctx, g)
	setg(g)
	if setStack {
		restoreGsignalStack(&gsignalStack)
	}
}


func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
	_g_ := getg()
	c := &sigctxt{info, ctxt}
	... 
  // 如果是一個(gè)搶占信號(hào)
	if sig == sigPreempt && debug.asyncpreemptoff == 0 { 
   		// 處理?yè)屨夹盘?hào)
		doSigPreempt(gp, c) 
	}

	...
}

sighandler 方法里面做了很多其他信號(hào)的處理工作,我們只關(guān)心搶占部分的代碼,這里最終會(huì)通過 doSigPreempt 方法執(zhí)行搶占。

這個(gè)函數(shù)在 runtime/signal_unix.go文件中:

doSigPreempt

func doSigPreempt(gp *g, ctxt *sigctxt) { 
	// 檢查此 G 是否要被搶占并且可以安全地?fù)屨?
	if wantAsyncPreempt(gp) { 
		// 檢查是否能安全的進(jìn)行搶占
		if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
			// 修改寄存器,并執(zhí)行搶占調(diào)用
			ctxt.pushCall(funcPC(asyncPreempt), newpc)
		}
	}
 
	// 更新一下?lián)屨枷嚓P(guān)字段
	atomic.Xadd(&gp.m.preemptGen, 1)
	atomic.Store(&gp.m.signalPending, 0) 
}

函數(shù)會(huì)處理?yè)屨夹盘?hào),獲取當(dāng)前的 SP 和 PC 寄存器并調(diào)用 ctxt.pushCall修改寄存器,并調(diào)用 runtime/preempt.go 的 asyncPreempt 函數(shù)。

// 保存用戶態(tài)寄存器后調(diào)用asyncPreempt2
func asyncPreempt()

asyncPreempt 的匯編代碼在 src/runtime/preempt_amd64.s中,該函數(shù)會(huì)保存用戶態(tài)寄存器后調(diào)用 runtime/preempt.go 的 asyncPreempt2 函數(shù)中:

asyncPreempt2

func asyncPreempt2() {
	gp := getg()
	gp.asyncSafePoint = true
	// 該 G 是否可以被搶占 
	if gp.preemptStop { 
		mcall(preemptPark)
	} else { 
    	// 讓 G 放棄當(dāng)前在 M 上的執(zhí)行權(quán)利,將 G 放入全局隊(duì)列等待后續(xù)調(diào)度
		mcall(gopreempt_m)
	}
	gp.asyncSafePoint = false
}

該函數(shù)會(huì)獲取當(dāng)前 G ,然后判斷 G 的 preemptStop 值,preemptStop 會(huì)在調(diào)用 runtime/preempt.go的 suspendG 函數(shù)的時(shí)候?qū)?_Grunning 狀態(tài)的 Goroutine 標(biāo)記成可以被搶占 gp.preemptStop = true,表示該 G 可以被搶占。

下面我們看一下執(zhí)行搶占任務(wù)會(huì)調(diào)用的 runtime/proc.go的 preemptPark函數(shù):

preemptPark

func preemptPark(gp *g) {
	
	status := readgstatus(gp)
	if status&^_Gscan != _Grunning {
		dumpgstatus(gp)
		throw("bad g status")
	}
	gp.waitreason = waitReasonPreempted 
	casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted)
    // 使當(dāng)前 m 放棄 g,讓出線程
	dropg()
    // 修改當(dāng)前 Goroutine 的狀態(tài)到 _Gpreempted
	casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted)
    // 并繼續(xù)執(zhí)行調(diào)度
	schedule()
}

preemptPark 會(huì)修改當(dāng)前 Goroutine 的狀態(tài)到 _Gpreempted ,調(diào)用 dropg 讓出線程,最后調(diào)用 schedule 函數(shù)繼續(xù)執(zhí)行其他 Goroutine 的任務(wù)循環(huán)調(diào)度。

gopreempt_m

gopreempt_m 方法比起搶占更像是主動(dòng)讓權(quán),然后重新加入到執(zhí)行隊(duì)列中等待調(diào)度。

func gopreempt_m(gp *g) { 
	goschedImpl(gp)
}

func goschedImpl(gp *g) {
	status := readgstatus(gp)
	...
  // 更新狀態(tài)為 _Grunnable
	casgstatus(gp, _Grunning, _Grunnable)
  // 使當(dāng)前 m 放棄 g,讓出線程
	dropg()
	lock(&sched.lock)
  // 重新加入到全局執(zhí)行隊(duì)列中
	globrunqput(gp)
	unlock(&sched.lock)
	// 并繼續(xù)執(zhí)行調(diào)度
	schedule()
}

搶占信號(hào)發(fā)送

搶占信號(hào)的發(fā)送是由 preemptM 進(jìn)行的。

這個(gè)函數(shù)在runtime/signal_unix.go文件中:

preemptM

const sigPreempt = _SIGURG

func preemptM(mp *m) {
	...
	if atomic.Cas(&mp.signalPending, 0, 1) { 
		
		// preemptM 向 M 發(fā)送搶占請(qǐng)求。
		// 接收到該請(qǐng)求后,如果正在運(yùn)行的 G 或 P 被標(biāo)記為搶占,并且 Goroutine 處于異步安全點(diǎn),
		// 它將搶占 Goroutine。
		signalM(mp, sigPreempt)
	}
}

preemptM 這個(gè)函數(shù)會(huì)調(diào)用 signalM 將在初始化的安裝的 _SIGURG 信號(hào)發(fā)送到指定的 M 上。

使用 preemptM 發(fā)送搶占信號(hào)的地方主要有下面幾個(gè):

  1. Go 后臺(tái)監(jiān)控 runtime.sysmon 檢測(cè)超時(shí)發(fā)送搶占信號(hào);

  2. Go GC 棧掃描發(fā)送搶占信號(hào);

  3. Go GC STW 的時(shí)候調(diào)用 preemptall 搶占所有 P,讓其暫停;

Go 后臺(tái)監(jiān)控執(zhí)行搶占

系統(tǒng)監(jiān)控 runtime.sysmon 會(huì)在循環(huán)中調(diào)用 runtime.retake搶占處于運(yùn)行或者系統(tǒng)調(diào)用中的處理器,該函數(shù)會(huì)遍歷運(yùn)行時(shí)的全局處理器。

系統(tǒng)監(jiān)控通過在循環(huán)中搶占主要是為了避免 G 占用 M 的時(shí)間過長(zhǎng)造成饑餓。

runtime.retake主要分為兩部分:

  1. 調(diào)用 preemptone 搶占當(dāng)前處理器;

  2. 調(diào)用 handoffp 讓出處理器的使用權(quán);

搶占當(dāng)前處理器

func retake(now int64) uint32 {
	n := 0
	 
	lock(&allpLock) 
	// 遍歷 allp 數(shù)組
	for i := 0; i < len(allp); i++ {
		_p_ := allp[i]
		if _p_ == nil { 
			continue
		}
		pd := &_p_.sysmontick
		s := _p_.status
		sysretake := false
		if s == _Prunning || s == _Psyscall {
			// 調(diào)度次數(shù)
			t := int64(_p_.schedtick)
			if int64(pd.schedtick) != t {
				pd.schedtick = uint32(t)
				// 處理器上次調(diào)度時(shí)間
				pd.schedwhen = now
			// 搶占 G 的執(zhí)行,如果上一次觸發(fā)調(diào)度的時(shí)間已經(jīng)過去了 10ms
			} else if pd.schedwhen+forcePreemptNS <= now {
				preemptone(_p_)
				sysretake = true
			}
		}
		...
	}
	unlock(&allpLock)
	return uint32(n)
}

這一過程會(huì)獲取當(dāng)前 P 的狀態(tài),如果處于 _Prunning 或者 _Psyscall 狀態(tài)時(shí),并且上一次觸發(fā)調(diào)度的時(shí)間已經(jīng)過去了 10ms,那么會(huì)調(diào)用 preemptone 進(jìn)行搶占信號(hào)的發(fā)送,preemptone 在上面我們已經(jīng)講過了,這里就不再?gòu)?fù)述。

Go語(yǔ)言基于信號(hào)搶占式調(diào)度的示例分析

調(diào)用 handoffp 讓出處理器的使用權(quán)

func retake(now int64) uint32 {
	n := 0
	lock(&allpLock) 
	// 遍歷 allp 數(shù)組
	for i := 0; i < len(allp); i++ {
		_p_ := allp[i]
		if _p_ == nil { 
			continue
		}
		pd := &_p_.sysmontick
		s := _p_.status
		sysretake := false
		...
		if s == _Psyscall { 
			// 系統(tǒng)調(diào)用的次數(shù)
			t := int64(_p_.syscalltick)
			if !sysretake && int64(pd.syscalltick) != t {
				pd.syscalltick = uint32(t)
				// 系統(tǒng)調(diào)用的時(shí)間
				pd.syscallwhen = now
				continue
			} 
			if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
				continue
			} 
			unlock(&allpLock) 
			incidlelocked(-1)
			if atomic.Cas(&_p_.status, s, _Pidle) { 
				n++
				_p_.syscalltick++
				// 讓出處理器的使用權(quán)
				handoffp(_p_)
			}
			incidlelocked(1)
			lock(&allpLock)
		}
	}
	unlock(&allpLock)
	return uint32(n)
}

這一過程會(huì)判斷 P 的狀態(tài)如果處于 _Psyscall 狀態(tài)時(shí),會(huì)進(jìn)行一個(gè)判斷,有一個(gè)不滿足則調(diào)用 handoffp 讓出 P 的使用權(quán):

  1. runqempty(_p_) :判斷 P 的任務(wù)隊(duì)列是否為空;

  2. atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle):nmspinning 表示正在竊取 G 的數(shù)量,npidle 表示空閑 P 的數(shù)量,判斷是否存在空閑 P 和正在進(jìn)行調(diào)度竊取 G 的 P;

  3. pd.syscallwhen+10*1000*1000 > now:判斷是否系統(tǒng)調(diào)用時(shí)間超過了 10ms ;

Go GC 棧掃描發(fā)送搶占信號(hào)

GC 相關(guān)的內(nèi)容可以看這篇:《Go語(yǔ)言GC實(shí)現(xiàn)原理及源碼分析 https://www.luozhiyun.com/archives/475》。Go 在 GC 時(shí)對(duì) GC Root 進(jìn)行標(biāo)記的時(shí)候會(huì)掃描 G 的棧,掃描之前會(huì)調(diào)用 suspendG 掛起 G 的執(zhí)行才進(jìn)行掃描,掃描完畢之后再次調(diào)用 resumeG 恢復(fù)執(zhí)行。

該函數(shù)在:runtime/mgcmark.go:

markroot

func markroot(gcw *gcWork, i uint32) { 
	...
 	switch { 
	...
	// 掃描各個(gè) G 的棧
	default: 
		// 獲取需要掃描的 G
		var gp *g
		if baseStacks <= i && i < end {
			gp = allgs[i-baseStacks]
		} else {
			throw("markroot: bad index")
		} 
		...
		// 轉(zhuǎn)交給g0進(jìn)行掃描
		systemstack(func() {  
			...
			// 掛起 G,讓對(duì)應(yīng)的 G 停止運(yùn)行
			stopped := suspendG(gp)
			if stopped.dead {
				gp.gcscandone = true
				return
			}
			if gp.gcscandone {
				throw("g already scanned")
			}
			// 掃描g的棧
			scanstack(gp, gcw)
			gp.gcscandone = true
			// 恢復(fù)該 G 的執(zhí)行
			resumeG(stopped) 
		})
	}
}

markroot 在掃描棧之前會(huì)切換到 G0 轉(zhuǎn)交給g0進(jìn)行掃描,然后調(diào)用 suspendG 會(huì)判斷 G 的運(yùn)行狀態(tài),如果該 G 處于 運(yùn)行狀態(tài) _Grunning,那么會(huì)設(shè)置 preemptStop 為 true 并發(fā)送搶占信號(hào)。

該函數(shù)在:runtime/preempt.go:

suspendG

func suspendG(gp *g) suspendGState {
	...
	const yieldDelay = 10 * 1000

	var nextPreemptM int64
	for i := 0; ; i++ {
		switch s := readgstatus(gp); s { 
		... 
		case _Grunning:
			if gp.preemptStop && gp.preempt && gp.stackguard0 == stackPreempt && asyncM == gp.m && atomic.Load(&asyncM.preemptGen) == asyncGen {
				break
			}
			if !castogscanstatus(gp, _Grunning, _Gscanrunning) {
				break
			}
			// 設(shè)置搶占字段
			gp.preemptStop = true
			gp.preempt = true
			gp.stackguard0 = stackPreempt
 
			asyncM2 := gp.m
			asyncGen2 := atomic.Load(&asyncM2.preemptGen)
			// asyncM 與 asyncGen 標(biāo)記的是循環(huán)里 上次搶占的信息,用來(lái)校驗(yàn)不能重復(fù)搶占
			needAsync := asyncM != asyncM2 || asyncGen != asyncGen2
			asyncM = asyncM2
			asyncGen = asyncGen2

			casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning)
 
			if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync { 
				now := nanotime()
				// 限制搶占的頻率
				if now >= nextPreemptM {
					nextPreemptM = now + yieldDelay/2
					// 執(zhí)行搶占信號(hào)發(fā)送
					preemptM(asyncM)
				}
			}
		}
		...
	}
}

對(duì)于 suspendG 函數(shù)我只截取出了 G 在 _Grunning 狀態(tài)下的處理情況。該狀態(tài)下會(huì)將 preemptStop 設(shè)置為 true,也是唯一一個(gè)地方設(shè)置為 true 的地方。preemptStop 和搶占信號(hào)的執(zhí)行有關(guān),忘記的同學(xué)可以翻到上面的 asyncPreempt2 函數(shù)中。

Go GC StopTheWorld 搶占所有 P

Go GC STW 是通過 stopTheWorldWithSema 函數(shù)來(lái)執(zhí)行的,該函數(shù)在 runtime/proc.go:

stopTheWorldWithSema

func stopTheWorldWithSema() {
	_g_ := getg() 

	lock(&sched.lock)
	sched.stopwait = gomaxprocs
	// 標(biāo)記 gcwaiting,調(diào)度時(shí)看見此標(biāo)記會(huì)進(jìn)入等待
	atomic.Store(&sched.gcwaiting, 1)
	// 發(fā)送搶占信號(hào)
	preemptall() 
	// 暫停當(dāng)前 P
	_g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic.
	...
	wait := sched.stopwait > 0
	unlock(&sched.lock)
	if wait {
		for {
			//  等待 100 us
			if notetsleep(&sched.stopnote, 100*1000) {
				noteclear(&sched.stopnote)
				break
			}
			// 再次進(jìn)行發(fā)送搶占信號(hào)
			preemptall()
		}
	}
	...
}

stopTheWorldWithSema 函數(shù)會(huì)調(diào)用 preemptall 對(duì)所有的 P 發(fā)送搶占信號(hào)。

preemptall 函數(shù)的文件位置在 runtime/proc.go:

preemptall

func preemptall() bool {
   res := false
   // 遍歷所有的 P
   for _, _p_ := range allp {
      if _p_.status != _Prunning {
         continue
      }
      // 對(duì)正在運(yùn)行的 P 發(fā)送搶占信號(hào)
      if preemptone(_p_) {
         res = true
      }
   }
   return res
}

preemptall 調(diào)用的 preemptone 會(huì)將 P 對(duì)應(yīng)的 M 中正在執(zhí)行的 G 并標(biāo)記為正在執(zhí)行搶占;最后會(huì)調(diào)用 preemptM 向 M 發(fā)送搶占信號(hào)。

該函數(shù)的文件位置在 runtime/proc.go:

preemptone

func preemptone(_p_ *p) bool {
	// 獲取 P 對(duì)應(yīng)的 M
	mp := _p_.m.ptr()
	if mp == nil || mp == getg().m {
		return false
	}
	// 獲取 M 正在執(zhí)行的 G
	gp := mp.curg
	if gp == nil || gp == mp.g0 {
		return false
	}
	// 將 G 標(biāo)記為搶占
	gp.preempt = true

	// 在棧擴(kuò)張的時(shí)候會(huì)檢測(cè)是否被搶占
	gp.stackguard0 = stackPreempt

	// 請(qǐng)求該 P 的異步搶占
	if preemptMSupported && debug.asyncpreemptoff == 0 {
		_p_.preempt = true
		preemptM(mp)
	} 
	return true
}

Go語(yǔ)言基于信號(hào)搶占式調(diào)度的示例分析

具體的邏輯:

  1. 程序啟動(dòng)時(shí),在注冊(cè) _SIGURG 信號(hào)的處理函數(shù) runtime.doSigPreempt;

  2. 此時(shí)有一個(gè) M1 通過 signalM 函數(shù)向 M2 發(fā)送中斷信號(hào) _SIGURG;

  3. M2 收到信號(hào),操作系統(tǒng)中斷其執(zhí)行代碼,并切換到信號(hào)處理函數(shù)runtime.doSigPreempt;

  4. M2 調(diào)用 runtime.asyncPreempt 修改執(zhí)行的上下文,重新進(jìn)入調(diào)度循環(huán)進(jìn)而調(diào)度其他 G;

Go語(yǔ)言基于信號(hào)搶占式調(diào)度的示例分析

關(guān)于Go語(yǔ)言基于信號(hào)搶占式調(diào)度的示例分析就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

文章標(biāo)題:Go語(yǔ)言基于信號(hào)搶占式調(diào)度的示例分析
網(wǎng)頁(yè)地址:http://muchs.cn/article22/pdgjcc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供自適應(yīng)網(wǎng)站云服務(wù)器、網(wǎng)站導(dǎo)航企業(yè)網(wǎng)站制作、搜索引擎優(yōu)化、用戶體驗(yàn)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁(yè)設(shè)計(jì)公司