python如何使用redis做隊(duì)列服務(wù)

這篇文章給大家介紹python如何使用redis做隊(duì)列服務(wù),內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

創(chuàng)新互聯(lián)服務(wù)緊隨時(shí)代發(fā)展步伐,進(jìn)行技術(shù)革新和技術(shù)進(jìn)步,經(jīng)過十年的發(fā)展和積累,已經(jīng)匯集了一批資深網(wǎng)站策劃師、設(shè)計(jì)師、專業(yè)的網(wǎng)站實(shí)施團(tuán)隊(duì)以及高素質(zhì)售后服務(wù)人員,并且完全形成了一套成熟的業(yè)務(wù)流程,能夠完全依照客戶要求對(duì)網(wǎng)站進(jìn)行成都網(wǎng)站建設(shè)、成都做網(wǎng)站、建設(shè)、維護(hù)、更新和改版,實(shí)現(xiàn)客戶網(wǎng)站對(duì)外宣傳展示的首要目的,并為客戶企業(yè)品牌互聯(lián)網(wǎng)化提供全面的解決方案。

系統(tǒng)中引入消息隊(duì)列機(jī)制是對(duì)系統(tǒng)一個(gè)非常大的改善。例如一個(gè)web系統(tǒng)中,用戶做了某項(xiàng)操作后需要發(fā)送郵件通知到用戶郵箱中。你可以使用同步方式讓用戶等待郵件發(fā)送完成后反饋給用戶,但是這樣可能會(huì)因?yàn)榫W(wǎng)絡(luò)的不確定性造成用戶長時(shí)間的等待從而影響用戶體驗(yàn)。

有些場(chǎng)景下是不可能使用同步方式等待完成的,那些需要后臺(tái)花費(fèi)大量時(shí)間的操作。例如極端例子,一個(gè)在線編譯系統(tǒng)任務(wù),后臺(tái)編譯完成需要30分鐘。這種場(chǎng)景的設(shè)計(jì)不可能同步等待后在回饋,必須是先反饋用戶隨后異步處理完成,再等待處理完成后根據(jù)情況再此反饋用戶與否。

另外適用消息隊(duì)列的情況是那些系統(tǒng)處理能力有限的情況下,先使用隊(duì)列機(jī)制把任務(wù)暫時(shí)存放起來,系統(tǒng)再一個(gè)個(gè)輪流處理掉排隊(duì)的任務(wù)。這樣在系統(tǒng)吞吐量不足的情況下也能穩(wěn)定的處理掉高并發(fā)的任務(wù)。

消息隊(duì)列可以用來做排隊(duì)機(jī)制,只要系統(tǒng)需要用到排隊(duì)機(jī)制的地方就可以使用消息隊(duì)列來作。

rabbitmq的優(yōu)先級(jí)做法

目前成熟的消息隊(duì)列產(chǎn)品有很多,著名的例如rabbitmq。它使用起來相對(duì)還是比較簡單的,功能也相對(duì)比較豐富,一般場(chǎng)合下是完全夠用的。但是有個(gè)很煩人的就是它不支持優(yōu)先級(jí)。 例如一個(gè)發(fā)郵件的任務(wù),某些特權(quán)用戶希望它的郵件能夠更加及時(shí)的發(fā)送出去,至少比普通用戶要優(yōu)先對(duì)待。默認(rèn)情況下rabbitmq是無法處理掉的,扔給rabbitmq的任務(wù)都是FIFO先進(jìn)先出。但是我們可以使用一些變通的技巧來支持這些優(yōu)先級(jí)。創(chuàng)建多個(gè)隊(duì)列,并為rabbitmq的消費(fèi)者設(shè)置相應(yīng)的路由規(guī)則。

例如默認(rèn)情況下有這樣一個(gè)隊(duì)列,我們拿list來模擬 [task1, task2, task3],消費(fèi)者輪流按照FIFO的原則一個(gè)個(gè)拿出task來處理掉。如果有高優(yōu)先級(jí)的任務(wù)進(jìn)來,它也只能跟在最后被處理[task1, task2, task3, higitask1]. 但是如果使用兩個(gè)隊(duì)列,一個(gè)高優(yōu)先級(jí)隊(duì)列,一個(gè)普通優(yōu)先級(jí)隊(duì)列。 普通優(yōu)先級(jí)[task1, task2, task3], 高優(yōu)先級(jí)[hightask1 ] 然后我們?cè)O(shè)置消費(fèi)者的路由讓消費(fèi)者隨機(jī)從任意隊(duì)列中取數(shù)據(jù)即可。

并且我們可以定義一個(gè)專門處理高優(yōu)先級(jí)隊(duì)列的消費(fèi)者,它空閑的時(shí)候也不處理低優(yōu)先級(jí)隊(duì)列的數(shù)據(jù)。這類似銀行的VIP柜臺(tái),普通客戶在銀行取號(hào)排隊(duì),一個(gè)VIP來了他雖然沒有從取號(hào)機(jī)里拿出一個(gè)排在普通會(huì)員前面的票,但是他還是可以更快地直接走VIP通道。

使用rabbitmq來做支持優(yōu)先級(jí)的消息隊(duì)列的話,就像是上面所述同銀行VIP會(huì)員一樣,走不同的通道。但是這種方式只是相對(duì)的優(yōu)先級(jí),做不到絕對(duì)的優(yōu)先級(jí)控制,例如我希望某一個(gè)優(yōu)先級(jí)高的任務(wù)在絕對(duì)意義上要比其他普通任務(wù)優(yōu)先處理掉,這樣上面的方案是行不通的。因?yàn)閞abbitmq的消費(fèi)者只知道再自己空閑的情況下從自己關(guān)心的隊(duì)列中“隨機(jī)”取某一個(gè)隊(duì)列里面的第一個(gè)數(shù)據(jù)來處理,它沒法控制優(yōu)先取找哪一個(gè)隊(duì)列?;蛘吒蛹?xì)粒度的優(yōu)先級(jí)控制。或者你系統(tǒng)里面設(shè)置的優(yōu)先級(jí)有10多種。這樣使用rabbitmq也是很難實(shí)現(xiàn)的。

但是如果使用redis來做隊(duì)列的話上面的需求都可以實(shí)現(xiàn)。

使用redis怎么做消息隊(duì)列

首先redis它的設(shè)計(jì)是用來做緩存的,但是由于它自身的某種特性使得他可以用來做消息隊(duì)列。它有幾個(gè)阻塞式的API可以使用,正是這些阻塞式的API讓他有做消息隊(duì)列的能力。

試想一下在”數(shù)據(jù)庫解決所有問題“的思路下,不使用消息隊(duì)列也是可以完成你的需求的。我們把任務(wù)全部存放在數(shù)據(jù)庫然后通過不斷的輪詢方式來取任務(wù)處理。這種做法雖然可以完成你的任務(wù)但是做法很粗劣。但是如果你的數(shù)據(jù)庫接口提供一個(gè)阻塞的方法那么就可以避免輪詢操作了,你的數(shù)據(jù)庫也可以用來做消息隊(duì)列,只不過目前的數(shù)據(jù)庫還沒有這樣的接口。 另外做消息隊(duì)列的其他特性例如FIFO也很容易實(shí)現(xiàn),只需要一個(gè)List對(duì)象從頭取數(shù)據(jù),從尾部塞數(shù)據(jù)即可實(shí)現(xiàn)。

redis能做消息隊(duì)列得益于他list對(duì)象blpop brpop接口以及Pub/Sub(發(fā)布/訂閱)的某些接口。他們都是阻塞版的,所以可以用來做消息隊(duì)列。

redis消息隊(duì)列優(yōu)先級(jí)的實(shí)現(xiàn)

一些基礎(chǔ)redis基礎(chǔ)知識(shí)的說明

redis> blpop tasklist 0
"im task 01"

這個(gè)例子使用blpop命令會(huì)阻塞方式地從tasklist列表中取頭一個(gè)數(shù)據(jù),最后一個(gè)參數(shù)就是等待超時(shí)的時(shí)間。如果設(shè)置為0則表示無限等待。另外redis存放的數(shù)據(jù)都只能是string類型,所以在任務(wù)傳遞的時(shí)候只能是傳遞字符串。我們只需要簡單的將負(fù)責(zé)數(shù)據(jù)序列化成json格式的字符串,然后消費(fèi)者那邊再轉(zhuǎn)換一下即可。

這里我們的示例語言使用python,鏈接redis的庫使用redis-py. 如果你有些編程基礎(chǔ)把它切換成自己喜歡的語言應(yīng)該是沒問題的。

  1. 1簡單的FIFO隊(duì)列

  2. import redis, time

  3. def handle(info):

  4.     print info

  5.     time.sleep(20)

  6. def main():

  7.     pool = redis.ConnectionPool(host='localhost', port=6379, db=0)

  8.     r = redis.Redis(connection_pool=pool)

  9.     while 1:

  10.         result = r.brpop('task', 0)            

  11.         handle(result[1])

  12. if __name__ == "__main__":

  13.     main()

上例子即使一個(gè)最簡單的消費(fèi)者,我們通過一個(gè)無限循環(huán)不斷地從redis的隊(duì)列中取數(shù)據(jù)。如果隊(duì)列中沒有數(shù)據(jù)則沒有超時(shí)的阻塞在那里,有數(shù)據(jù)則取出往下執(zhí)行。

一般情況取出來是個(gè)復(fù)雜的字符串,我們可能需要將其格式化后作為再傳給處理函數(shù),但是為了簡單我們的例子就是一個(gè)普通字符串。另外例子中的處理函數(shù)不做任何處理,僅僅sleep 用來模擬耗時(shí)的操作。

我們另開一個(gè)redis的客戶端來模擬生產(chǎn)者,自帶的客戶端就可以。多往tasklist 隊(duì)列里面塞上一些數(shù)據(jù)。

  1. redis 127.0.0.1:6379> LPUSH task "fuckin1"

  2. (integer) 1

  3. redis 127.0.0.1:6379> LPUSH task "fuckin2"

  4. (integer) 1

  5. redis 127.0.0.1:6379> LPUSH task "fuckin3"

  6. (integer) 2

  7. redis 127.0.0.1:6379> LPUSH task "fuckin4"

  8. (integer) 3

  9. redis 127.0.0.1:6379> LPUSH task "fuckin5"

  10. (integer) 4

  11. redis 127.0.0.1:6379> LPUSH task "fuckin6"

  12. (integer) 5

  13. redis 127.0.0.1:6379> LPUSH task "fuckin7"

  14. (integer) 6

  15. redis 127.0.0.1:6379> LPUSH task "fuckin8"

  16. (integer) 7

  17. redis 127.0.0.1:6379> LPUSH task "fuckin10"

  18. (integer) 8

  19. redis 127.0.0.1:6379> lrange task 0 -1

  20. 1) "fuckin10"

  21. 2) "fuckin8"

  22. 3) "fuckin7"

  23. 4) "fuckin6"

  24. 5) "fuckin5"

  25. 6) "fuckin4"

  26. 7) "fuckin3"


可以看到
[root@host-192-168-1-56 soft]# python duilie.py 
('task', 'fuckin1')
fuckin1
('task', 'fuckin2')    ---每個(gè)任務(wù)之間間隔20秒,20秒是模擬任務(wù)執(zhí)行時(shí)間
fuckin2
('task', 'fuckin3')
fuckin3
('task', 'fuckin4')
fuckin4
('task', 'fuckin5')
.。
。。。
。。。
('task', 'fuckin10')
fuckin10
。。。等待狀態(tài),等待新的任務(wù)

2.簡單優(yōu)先級(jí)的隊(duì)列

假設(shè)一種簡單的需求,只需要高優(yōu)先級(jí)的比低優(yōu)先級(jí)的任務(wù)率先處理掉。其他任務(wù)之間的順序一概不管,這種我們只需要在在遇到高優(yōu)先級(jí)任務(wù)的時(shí)候?qū)⑺疥?duì)列的前頭,而不是push到最后面即可。

因?yàn)槲覀兊年?duì)列是使用的redis的 list,所以很容易實(shí)現(xiàn)。遇到高優(yōu)先級(jí)的使用rpush 遇到低優(yōu)先級(jí)的使用lpush

redis> lpush tasklist 'im task 01'
redis> lpush tasklist 'im task 02'
redis> rpush tasklist 'im high task 01'
redis> rpush tasklist 'im high task 01'
redis> lpush tasklist 'im task 03'
redis> rpush tasklist 'im high task 03'

隨后會(huì)看到,高優(yōu)先級(jí)的總是比低優(yōu)先級(jí)的率先執(zhí)行。但是這個(gè)方案的缺點(diǎn)是高優(yōu)先級(jí)的任務(wù)之間的執(zhí)行順序是先進(jìn)后出的。

3.較為完善的隊(duì)列

例子2中只是簡單的將高優(yōu)先級(jí)的任務(wù)塞到隊(duì)列最前面,低優(yōu)先級(jí)的塞到最后面。這樣保證不了高優(yōu)先級(jí)任務(wù)之間的順序。

假設(shè)當(dāng)所有的任務(wù)都是高優(yōu)先級(jí)的話,那么他們的執(zhí)行順序?qū)⑹窍喾吹摹_@樣明顯違背了隊(duì)列的FIFO原則。

不過只要稍加改進(jìn)就可以完善我們的隊(duì)列。

跟使用rabbitmq一樣,我們?cè)O(shè)置兩個(gè)隊(duì)列,一個(gè)高優(yōu)先級(jí)一個(gè)低優(yōu)先級(jí)的隊(duì)列。高優(yōu)先級(jí)任務(wù)放到高隊(duì)列中,低的放在低優(yōu)先隊(duì)列中。redis和rabbitmq不同的是它可以要求隊(duì)列消費(fèi)者從哪個(gè)隊(duì)列里面先讀。

def main():
    pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
    r = redis.Redis(connection_pool=pool)
    while 1:
        result = r.brpop(['high_task_queue', 'low_task_queue'], 0)
        handle(result[1])

上面的代碼,會(huì)阻塞地從'high_task_queue', 'low_task_queue'這兩個(gè)隊(duì)列里面取數(shù)據(jù),如果第一個(gè)沒有再從第二個(gè)里面取。 所以只需要將隊(duì)列消費(fèi)者做這樣的改進(jìn)便可以達(dá)到目的。

redis> lpush low_task_queue low001
redis> lpush low_task_queue low002
redis> lpush low_task_queue low003
redis> lpush low_task_queue low004
redis> lpush high_task_queue low001
redis> lpush high_task_queue low002
redis> lpush high_task_queue low003
redis> lpush high_task_queue low004

通過上面的測(cè)試看到,高優(yōu)先級(jí)的會(huì)被率先執(zhí)行,并且高優(yōu)先級(jí)之間也是保證了FIFO的原則。

這種方案我們可以支持不同階段的優(yōu)先級(jí)隊(duì)列,例如高中低三個(gè)級(jí)別或者更多的級(jí)別都可以。

4.優(yōu)先級(jí)級(jí)別很多的情況

假設(shè)有個(gè)這樣的需求,優(yōu)先級(jí)不是簡單的高中低或者0-10這些固定的級(jí)別。而是類似0-99999這么多級(jí)別。那么我們第三種方案將不太合適了。 雖然redis有sorted set這樣的可以排序的數(shù)據(jù)類型,看是很可惜它沒有阻塞版的接口。于是我們還是只能使用list類型通過其他方式來完成目的。

有個(gè)簡單的做法我們可以只設(shè)置一個(gè)隊(duì)列,并保證它是按照優(yōu)先級(jí)排序號(hào)的。然后通過二分查找法查找一個(gè)任務(wù)合適的位置,并通過 lset 命令插入到相應(yīng)的位置。 例如隊(duì)列里面包含著寫優(yōu)先級(jí)的任務(wù)[1, 3, 6, 8, 9, 14],當(dāng)有個(gè)優(yōu)先級(jí)為7的任務(wù)過來,我們通過自己的二分算法一個(gè)個(gè)從隊(duì)列里面取數(shù)據(jù)出來反和目標(biāo)數(shù)據(jù)比對(duì),計(jì)算出相應(yīng)的位置然后插入到指定地點(diǎn)即可。

因?yàn)槎植檎沂潜容^快的,并且redis本身也都在內(nèi)存中,理論上速度是可以保證的。但是如果說數(shù)據(jù)量確實(shí)很大的話我們也可以通過一些方式來調(diào)優(yōu)。

回想我們第三種方案,把第三種方案結(jié)合起來就會(huì)很大程度上減少開銷。例如數(shù)據(jù)量十萬的隊(duì)列,它們的優(yōu)先級(jí)也是隨機(jī)0-十萬的區(qū)間。我們可以設(shè)置10個(gè)或者100個(gè)不同的隊(duì)列,0-一萬的優(yōu)先級(jí)任務(wù)投放到1號(hào)隊(duì)列,一萬-二萬的任務(wù)投放到2號(hào)隊(duì)列。這樣將一個(gè)隊(duì)列按不同等級(jí)拆分后它單個(gè)隊(duì)列的數(shù)據(jù)就減少許多,這樣二分查找匹配的效率也會(huì)高一點(diǎn)。但是數(shù)據(jù)所占的資源基本是不變的,十萬數(shù)據(jù)該占多少內(nèi)存還是多少。只是系統(tǒng)里面多了一些隊(duì)列而已。

關(guān)于python如何使用redis做隊(duì)列服務(wù)就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

新聞名稱:python如何使用redis做隊(duì)列服務(wù)
URL網(wǎng)址:http://muchs.cn/article28/ghhhcp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App開發(fā)、品牌網(wǎng)站制作、品牌網(wǎng)站設(shè)計(jì)、網(wǎng)站制作、品牌網(wǎng)站建設(shè)、外貿(mào)網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)