Go語(yǔ)言中怎么實(shí)現(xiàn)一個(gè)時(shí)間輪,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。
成都創(chuàng)新互聯(lián)主要從事網(wǎng)站制作、成都做網(wǎng)站、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)三臺(tái),10余年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來(lái)電咨詢建站服務(wù):18980820575
在時(shí)間輪中存儲(chǔ)任務(wù)的是一個(gè)環(huán)形隊(duì)列,底層采用數(shù)組實(shí)現(xiàn),數(shù)組中的每個(gè)元素可以存放一個(gè)定時(shí)任務(wù)列表。定時(shí)任務(wù)列表是一個(gè)環(huán)形的雙向鏈表,鏈表中的每一項(xiàng)表示的都是定時(shí)任務(wù)項(xiàng),其中封裝了真正的定時(shí)任務(wù)。
時(shí)間輪由多個(gè)時(shí)間格組成,每個(gè)時(shí)間格代表當(dāng)前時(shí)間輪的基本時(shí)間跨度(tickMs)。時(shí)間輪的時(shí)間格個(gè)數(shù)是固定的,可用 wheelSize 來(lái)表示,那么整個(gè)時(shí)間輪的總體時(shí)間跨度(interval)可以通過(guò)公式 tickMs×wheelSize 計(jì)算得出。
時(shí)間輪還有一個(gè)表盤指針(currentTime),用來(lái)表示時(shí)間輪當(dāng)前所處的時(shí)間,currentTime 是 tickMs 的整數(shù)倍。currentTime指向的地方是表示到期的時(shí)間格,表示需要處理的時(shí)間格所對(duì)應(yīng)的鏈表中的所有任務(wù)。
如下圖是一個(gè)tickMs為1s,wheelSize等于10的時(shí)間輪,每一格里面放的是一個(gè)定時(shí)任務(wù)鏈表,鏈表里面存有真正的任務(wù)項(xiàng):
初始情況下表盤指針 currentTime 指向時(shí)間格0,若時(shí)間輪的 tickMs 為 1ms 且 wheelSize 等于10,那么interval則等于10s。如下圖此時(shí)有一個(gè)定時(shí)為2s的任務(wù)插進(jìn)來(lái)會(huì)存放到時(shí)間格為2的任務(wù)鏈表中,用紅色標(biāo)記。隨著時(shí)間的不斷推移,指針 currentTime 不斷向前推進(jìn),如果過(guò)了2s,那么 currentTime 會(huì)指向時(shí)間格2的位置,會(huì)將此時(shí)間格的任務(wù)鏈表獲取出來(lái)處理。
如果當(dāng)前的指針 currentTime 指向的是2,此時(shí)如果插入一個(gè)9s的任務(wù)進(jìn)來(lái),那么新來(lái)的任務(wù)會(huì)服用原來(lái)的時(shí)間格鏈表,會(huì)存放到時(shí)間格1中
這里所講的時(shí)間輪都是簡(jiǎn)單時(shí)間輪,只有一層,總體時(shí)間范圍在 currentTime 和 currentTime+interval 之間。如果現(xiàn)在有一個(gè)15s的定時(shí)任務(wù)是需要重新開啟一個(gè)時(shí)間輪,設(shè)置一個(gè)時(shí)間跨度至少為15s的時(shí)間輪才夠用。但是這樣擴(kuò)充是沒(méi)有底線的,如果需要一個(gè)1萬(wàn)秒的時(shí)間輪,那么就需要一個(gè)這么大的數(shù)組去存放,不僅占用很大的內(nèi)存空間,而且也會(huì)因?yàn)樾枰闅v這么大的數(shù)組從而拉低效率。
因此引入了層級(jí)時(shí)間輪的概念。
如圖是一個(gè)兩層的時(shí)間輪,第二層時(shí)間輪也是由10個(gè)時(shí)間格組成,每個(gè)時(shí)間格的跨度是10s。第二層的時(shí)間輪的 tickMs 為第一層時(shí)間輪的 interval,即10s。每一層時(shí)間輪的 wheelSize 是固定的,都是10,那么第二層的時(shí)間輪的總體時(shí)間跨度 interval 為100s。
圖中展示了每個(gè)時(shí)間格對(duì)應(yīng)的過(guò)期時(shí)間范圍, 我們可以清晰地看到, 第二層時(shí)間輪的第0個(gè)時(shí)間格的過(guò)期時(shí)間范圍是 [0,9]。也就是說(shuō), 第二層時(shí)間輪的一個(gè)時(shí)間格就可以表示第一層時(shí)間輪的所有(10個(gè))時(shí)間格;
如果向該時(shí)間輪中添加一個(gè)15s的任務(wù),那么當(dāng)?shù)谝粚訒r(shí)間輪容納不下時(shí),進(jìn)入第二層時(shí)間輪,并插入到過(guò)期時(shí)間為[10,19]的時(shí)間格中。
隨著時(shí)間的流逝,當(dāng)原本15s的任務(wù)還剩下5s的時(shí)候,這里就有一個(gè)時(shí)間輪降級(jí)的操作,此時(shí)第一層時(shí)間輪的總體時(shí)間跨度已足夠,此任務(wù)被添加到第一層時(shí)間輪到期時(shí)間為5的時(shí)間格中,之后再經(jīng)歷5s后,此任務(wù)真正到期,最終執(zhí)行相應(yīng)的到期操作。
因?yàn)槲覀冞@個(gè)Go語(yǔ)言版本的時(shí)間輪代碼是仿照Kafka寫的,所以在具體實(shí)現(xiàn)時(shí)間輪 TimingWheel 時(shí)還有一些小細(xì)節(jié):
時(shí)間輪的時(shí)間格中每個(gè)鏈表會(huì)有一個(gè)root節(jié)點(diǎn)用于簡(jiǎn)化邊界條件。它是一個(gè)附加的鏈表節(jié)點(diǎn),該節(jié)點(diǎn)作為第一個(gè)節(jié)點(diǎn),它的值域中并不存儲(chǔ)任何東西,只是為了操作的方便而引入的;
除了第一層時(shí)間輪,其余高層時(shí)間輪的起始時(shí)間(startMs)都設(shè)置為創(chuàng)建此層時(shí)間輪時(shí)前面第一輪的 currentTime。每一層的 currentTime 都必須是 tickMs 的整數(shù)倍,如果不滿足則會(huì)將 currentTime 修剪為 tickMs 的整數(shù)倍。修剪方法為:currentTime = startMs - (startMs % tickMs);
Kafka 中的定時(shí)器只需持有 TimingWheel 的第一層時(shí)間輪的引用,并不會(huì)直接持有其他高層的時(shí)間輪,但每一層時(shí)間輪都會(huì)有一個(gè)引用(overflowWheel)指向更高一層的應(yīng)用;
Kafka 中的定時(shí)器使用了 DelayQueue 來(lái)協(xié)助推進(jìn)時(shí)間輪。在操作中會(huì)將每個(gè)使用到的時(shí)間格中每個(gè)鏈表都加入 DelayQueue,DelayQueue 會(huì)根據(jù)時(shí)間輪對(duì)應(yīng)的過(guò)期時(shí)間 expiration 來(lái)排序,最短 expiration 的任務(wù)會(huì)被排在 DelayQueue 的隊(duì)頭,通過(guò)單獨(dú)線程來(lái)獲取 DelayQueue 中到期的任務(wù);
type TimingWheel struct { // 時(shí)間跨度,單位是毫秒 tick int64 // in milliseconds // 時(shí)間輪個(gè)數(shù) wheelSize int64 // 總跨度 interval int64 // in milliseconds // 當(dāng)前指針指向時(shí)間 currentTime int64 // in milliseconds // 時(shí)間格列表 buckets []*bucket // 延遲隊(duì)列 queue *delayqueue.DelayQueue // 上級(jí)的時(shí)間輪引用 overflowWheel unsafe.Pointer // type: *TimingWheel exitC chan struct{} waitGroup waitGroupWrapper }
tick、wheelSize、interval、currentTime都比較好理解,buckets字段代表的是時(shí)間格列表,queue是一個(gè)延遲隊(duì)列,所有的任務(wù)都是通過(guò)延遲隊(duì)列來(lái)進(jìn)行觸發(fā),overflowWheel是上層時(shí)間輪的引用。
type bucket struct { // 任務(wù)的過(guò)期時(shí)間 expiration int64 mu sync.Mutex // 相同過(guò)期時(shí)間的任務(wù)隊(duì)列 timers *list.List }
bucket里面實(shí)際上封裝的是時(shí)間格里面的任務(wù)隊(duì)列,里面放入的是相同過(guò)期時(shí)間的任務(wù),到期后會(huì)將隊(duì)列timers拿出來(lái)進(jìn)行處理。這里有個(gè)有意思的地方是由于會(huì)有多個(gè)線程并發(fā)的訪問(wèn)bucket,所以需要用到原子類來(lái)獲取int64位的值,為了保證32位系統(tǒng)上面讀取64位數(shù)據(jù)的一致性,需要進(jìn)行64位對(duì)齊。具體的可以看這篇:https://www.luozhiyun.com/archives/429,講的是對(duì)內(nèi)存對(duì)齊的思考。
type Timer struct { // 到期時(shí)間 expiration int64 // in milliseconds // 要被執(zhí)行的具體任務(wù) task func() // Timer所在bucket的指針 b unsafe.Pointer // type: *bucket // bucket列表中對(duì)應(yīng)的元素 element *list.Element }
Timer是時(shí)間輪的最小執(zhí)行單元,是定時(shí)任務(wù)的封裝,到期后會(huì)調(diào)用task來(lái)執(zhí)行任務(wù)。
例如現(xiàn)在初始化一個(gè)tick是1s,wheelSize是10的時(shí)間輪:
func main() { tw := timingwheel.NewTimingWheel(time.Second, 10) tw.Start() } func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel { // 將傳入的tick轉(zhuǎn)化成毫秒 tickMs := int64(tick / time.Millisecond) // 如果小于零,那么panic if tickMs <= 0 { panic(errors.New("tick must be greater than or equal to 1ms")) } // 設(shè)置開始時(shí)間 startMs := timeToMs(time.Now().UTC()) // 初始化TimingWheel return newTimingWheel( tickMs, wheelSize, startMs, delayqueue.New(int(wheelSize)), ) } func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel { // 初始化buckets的大小 buckets := make([]*bucket, wheelSize) for i := range buckets { buckets[i] = newBucket() } // 實(shí)例化TimingWheel return &TimingWheel{ tick: tickMs, wheelSize: wheelSize, // currentTime必須是tickMs的倍數(shù),所以這里使用truncate進(jìn)行修剪 currentTime: truncate(startMs, tickMs), interval: tickMs * wheelSize, buckets: buckets, queue: queue, exitC: make(chan struct{}), } }
初始化十分簡(jiǎn)單,大家可以看看上面的代碼注釋即可。
下面我們看看start方法:
func (tw *TimingWheel) Start() { // Poll會(huì)執(zhí)行一個(gè)無(wú)限循環(huán),將到期的元素放入到queue的C管道中 tw.waitGroup.Wrap(func() { tw.queue.Poll(tw.exitC, func() int64 { return timeToMs(time.Now().UTC()) }) }) // 開啟無(wú)限循環(huán)獲取queue中C的數(shù)據(jù) tw.waitGroup.Wrap(func() { for { select { // 從隊(duì)列里面出來(lái)的數(shù)據(jù)都是到期的bucket case elem := <-tw.queue.C: b := elem.(*bucket) // 時(shí)間輪會(huì)將當(dāng)前時(shí)間 currentTime 往前移動(dòng)到 bucket的到期時(shí)間 tw.advanceClock(b.Expiration()) // 取出bucket隊(duì)列的數(shù)據(jù),并調(diào)用addOrRun方法執(zhí)行 b.Flush(tw.addOrRun) case <-tw.exitC: return } } }) }
這里使用了util封裝的一個(gè)Wrap方法,這個(gè)方法會(huì)起一個(gè)goroutines異步執(zhí)行傳入的函數(shù),具體的可以到我上面給出的鏈接去看源碼。
Start方法會(huì)啟動(dòng)兩個(gè)goroutines。第一個(gè)goroutines用來(lái)調(diào)用延遲隊(duì)列的queue的Poll方法,這個(gè)方法會(huì)一直循環(huán)獲取隊(duì)列里面的數(shù)據(jù),然后將到期的數(shù)據(jù)放入到queue的C管道中;第二個(gè)goroutines會(huì)無(wú)限循環(huán)獲取queue中C的數(shù)據(jù),如果C中有數(shù)據(jù)表示已經(jīng)到期,那么會(huì)先調(diào)用advanceClock方法將當(dāng)前時(shí)間 currentTime 往前移動(dòng)到 bucket的到期時(shí)間,然后再調(diào)用Flush方法取出bucket中的隊(duì)列,并調(diào)用addOrRun方法執(zhí)行。
func (tw *TimingWheel) advanceClock(expiration int64) { currentTime := atomic.LoadInt64(&tw.currentTime) // 過(guò)期時(shí)間大于等于(當(dāng)前時(shí)間+tick) if expiration >= currentTime+tw.tick { // 將currentTime設(shè)置為expiration,從而推進(jìn)currentTime currentTime = truncate(expiration, tw.tick) atomic.StoreInt64(&tw.currentTime, currentTime) // Try to advance the clock of the overflow wheel if present // 如果有上層時(shí)間輪,那么遞歸調(diào)用上層時(shí)間輪的引用 overflowWheel := atomic.LoadPointer(&tw.overflowWheel) if overflowWheel != nil { (*TimingWheel)(overflowWheel).advanceClock(currentTime) } } }
advanceClock方法會(huì)根據(jù)到期時(shí)間來(lái)從新設(shè)置currentTime,從而推進(jìn)時(shí)間輪前進(jìn)。
func (b *bucket) Flush(reinsert func(*Timer)) { var ts []*Timer b.mu.Lock() // 循環(huán)獲取bucket隊(duì)列節(jié)點(diǎn) for e := b.timers.Front(); e != nil; { next := e.Next() t := e.Value.(*Timer) // 將頭節(jié)點(diǎn)移除bucket隊(duì)列 b.remove(t) ts = append(ts, t) e = next } b.mu.Unlock() b.SetExpiration(-1) // TODO: Improve the coordination with b.Add() for _, t := range ts { reinsert(t) } }
Flush方法會(huì)根據(jù)bucket里面timers列表進(jìn)行遍歷插入到ts數(shù)組中,然后調(diào)用reinsert方法,這里是調(diào)用的addOrRun方法。
func (tw *TimingWheel) addOrRun(t *Timer) { // 如果已經(jīng)過(guò)期,那么直接執(zhí)行 if !tw.add(t) { // 異步執(zhí)行定時(shí)任務(wù) go t.task() } }
addOrRun會(huì)調(diào)用add方法檢查傳入的定時(shí)任務(wù)Timer是否已經(jīng)到期,如果到期那么異步調(diào)用task方法直接執(zhí)行。add方法我們下面會(huì)接著分析。
整個(gè)start執(zhí)行流程如圖:
start方法回啟動(dòng)一個(gè)goroutines調(diào)用poll來(lái)處理DelayQueue中到期的數(shù)據(jù),并將數(shù)據(jù)放入到管道C中;
start方法啟動(dòng)第二個(gè)goroutines方法會(huì)循環(huán)獲取DelayQueue中管道C的數(shù)據(jù),管道C中實(shí)際上存放的是一個(gè)bucket,然后遍歷bucket的timers列表,如果任務(wù)已經(jīng)到期,那么異步執(zhí)行,沒(méi)有到期則重新放入到DelayQueue中。
func main() { tw := timingwheel.NewTimingWheel(time.Second, 10) tw.Start() // 添加任務(wù) tw.AfterFunc(time.Second*15, func() { fmt.Println("The timer fires") exitC <- time.Now().UTC() }) }
我們通過(guò)AfterFunc方法添加一個(gè)15s的定時(shí)任務(wù),如果到期了,那么執(zhí)行傳入的函數(shù)。
func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer { t := &Timer{ expiration: timeToMs(time.Now().UTC().Add(d)), task: f, } tw.addOrRun(t) return t }
AfterFunc方法回根據(jù)傳入的任務(wù)到期時(shí)間,以及到期需要執(zhí)行的函數(shù)封裝成Timer,調(diào)用addOrRun方法。addOrRun方法我們上面已經(jīng)看過(guò)了,會(huì)根據(jù)到期時(shí)間來(lái)決定是否需要執(zhí)行定時(shí)任務(wù)。
下面我們來(lái)看一下add方法:
func (tw *TimingWheel) add(t *Timer) bool { currentTime := atomic.LoadInt64(&tw.currentTime) // 已經(jīng)過(guò)期 if t.expiration < currentTime+tw.tick { // Already expired return false // 到期時(shí)間在第一層環(huán)內(nèi) } else if t.expiration < currentTime+tw.interval { // Put it into its own bucket // 獲取時(shí)間輪的位置 virtualID := t.expiration / tw.tick b := tw.buckets[virtualID%tw.wheelSize] // 將任務(wù)放入到bucket隊(duì)列中 b.Add(t) // 如果是相同的時(shí)間,那么返回false,防止被多次插入到隊(duì)列中 if b.SetExpiration(virtualID * tw.tick) { // 將該bucket加入到延遲隊(duì)列中 tw.queue.Offer(b, b.Expiration()) } return true } else { // Out of the interval. Put it into the overflow wheel // 如果放入的到期時(shí)間超過(guò)第一層時(shí)間輪,那么放到上一層中去 overflowWheel := atomic.LoadPointer(&tw.overflowWheel) if overflowWheel == nil { atomic.CompareAndSwapPointer( &tw.overflowWheel, nil, // 需要注意的是,這里tick變成了interval unsafe.Pointer(newTimingWheel( tw.interval, tw.wheelSize, currentTime, tw.queue, )), ) overflowWheel = atomic.LoadPointer(&tw.overflowWheel) } // 往上遞歸 return (*TimingWheel)(overflowWheel).add(t) } }
add方法根據(jù)到期時(shí)間來(lái)分成了三部分,第一部分是小于當(dāng)前時(shí)間+tick,表示已經(jīng)到期,那么返回false執(zhí)行任務(wù)即可;
第二部分的判斷會(huì)根據(jù)expiration是否小于時(shí)間輪的跨度,如果小于的話表示該定時(shí)任務(wù)可以放入到當(dāng)前時(shí)間輪中,通過(guò)取模找到buckets對(duì)應(yīng)的時(shí)間格并放入到bucket隊(duì)列中,SetExpiration方法會(huì)根據(jù)傳入的參數(shù)來(lái)判斷是否已經(jīng)執(zhí)行過(guò)延遲隊(duì)列的Offer方法,防止重復(fù)插入;
第三部分表示該定時(shí)任務(wù)的時(shí)間跨度超過(guò)了當(dāng)前時(shí)間輪,需要升級(jí)到上一層的時(shí)間輪中。需要注意的是,上一層的時(shí)間輪的tick是當(dāng)前時(shí)間輪的interval,延遲隊(duì)列還是同一個(gè),然后設(shè)置為指針overflowWheel,并調(diào)用add方法往上層遞歸。
到這里時(shí)間輪已經(jīng)講完了,不過(guò)還有需要注意的地方,我們?cè)谟蒙厦娴臅r(shí)間輪實(shí)現(xiàn)中,使用了DelayQueue加環(huán)形隊(duì)列的方式實(shí)現(xiàn)了時(shí)間輪。對(duì)定時(shí)任務(wù)項(xiàng)的插入和刪除操作而言,TimingWheel時(shí)間復(fù)雜度為 O(1),在DelayQueue中的隊(duì)列使用的是優(yōu)先隊(duì)列,時(shí)間復(fù)雜度是O(log n),但是由于buckets列表實(shí)際上是非常小的,所以并不會(huì)影響性能。
看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對(duì)創(chuàng)新互聯(lián)的支持。
文章題目:Go語(yǔ)言中怎么實(shí)現(xiàn)一個(gè)時(shí)間輪
網(wǎng)站URL:http://muchs.cn/article14/gdcgge.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站建設(shè)、虛擬主機(jī)、企業(yè)網(wǎng)站制作、服務(wù)器托管、網(wǎng)站設(shè)計(jì)公司、建站公司
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)