[RabbitMQ+Python入門(mén)經(jīng)典]兔子和兔子窩

RabbitMQ作為一個(gè)工業(yè)級(jí)的消息隊(duì)列服務(wù)器,在其客戶(hù)端手冊(cè)列表的Python段當(dāng)中推薦了一篇blog,作為RabbitMQ+Python的入門(mén)手冊(cè)再合適不過(guò)了。不過(guò),正如其標(biāo)題Rabbit and Warrens(兔 子和養(yǎng)兔場(chǎng))一樣,這篇英文寫(xiě)的相當(dāng)俏皮,以至于對(duì)于我等非英文讀者來(lái)說(shuō)不像一般的技術(shù)文檔那么好懂,所以,翻譯一下吧。翻譯過(guò)了,希望其他人可以少用一 些時(shí)間。翻譯水平有限,不可能像原文一樣俏皮,部分地方可能就意譯了,希望以容易懂為準(zhǔn)。想看看老外的幽默的,推薦去看原文,其實(shí),也不是那么難理解……

成都創(chuàng)新互聯(lián)專(zhuān)注為客戶(hù)提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于成都做網(wǎng)站、網(wǎng)站制作、資中網(wǎng)絡(luò)推廣、小程序開(kāi)發(fā)、資中網(wǎng)絡(luò)營(yíng)銷(xiāo)、資中企業(yè)策劃、資中品牌公關(guān)、搜索引擎seo、人物專(zhuān)訪(fǎng)、企業(yè)宣傳片、企業(yè)代運(yùn)營(yíng)等,從售前售中售后,我們都將竭誠(chéng)為您服務(wù),您的肯定,是我們大的嘉獎(jiǎng);成都創(chuàng)新互聯(lián)為所有大學(xué)生創(chuàng)業(yè)者提供資中建站搭建服務(wù),24小時(shí)服務(wù)熱線(xiàn):13518219792,官方網(wǎng)址:muchs.cn

當(dāng)時(shí)我們的動(dòng)機(jī)很簡(jiǎn)單:從生產(chǎn)環(huán)境的電子郵件處理流程當(dāng)中分支出一個(gè)特定的離線(xiàn)分析流程。我們開(kāi)始用的MySQL,將要處理的東西放在表里面,另一個(gè)程序從中取。不過(guò)很快,這種設(shè)計(jì)的丑陋之處就顯現(xiàn)出來(lái)了…… 你想要多個(gè)程序從一個(gè)隊(duì)列當(dāng)中取數(shù)據(jù)來(lái)處理?沒(méi)問(wèn)題,我們硬編碼程序的個(gè)數(shù)好了……什么?還要能夠允許程序動(dòng)態(tài)地增加和減少的時(shí)候動(dòng)態(tài)進(jìn)行壓力分配?

是的,當(dāng)年我們想的簡(jiǎn)單的東西(做一個(gè)分支處理)逐漸變成了一個(gè)棘手的問(wèn)題。以前拿著錘子(MySQL)看所有東西都是釘子(表)的年代是多么美好……

在搜索了一下之后,我們走進(jìn)了消息隊(duì)列(message queue)的大門(mén)。不不,我們當(dāng)然知道消息隊(duì)列是什么,我們可是以做電子郵件程序謀生的。我們實(shí)現(xiàn)過(guò)各種各樣的專(zhuān)業(yè)的,高速的內(nèi)存隊(duì)列用來(lái)做電子郵件處理。我們不知道的是那一大類(lèi)現(xiàn)成的、通用的消息隊(duì)列(MQ)服務(wù)器——無(wú)論是用什么語(yǔ)言寫(xiě)出的,不需要復(fù)雜的裝配的,可以自然的在網(wǎng)絡(luò)上的應(yīng)用程序之間傳送數(shù)據(jù)的一類(lèi)程序。不用我們自己寫(xiě)?看看再說(shuō)。

讓大家看看你們的Queue吧……

過(guò)去的4年里,人們寫(xiě)了有好多好多的開(kāi)源的MQ服務(wù)器啊。其中大多數(shù)都是某公司例如LiveJournal寫(xiě)出來(lái)用來(lái)解決特定問(wèn)題的。它們的確不關(guān)心上面跑的是什么類(lèi)型的消息,不過(guò)他們的設(shè)計(jì)思想通常是和創(chuàng)建者息息相關(guān)的(消息的持久化,崩潰恢復(fù)等通常不在他們考慮范圍內(nèi))。不過(guò),有三個(gè)專(zhuān)門(mén)設(shè)計(jì)用來(lái)做及其靈活的消息隊(duì)列的程序值得關(guān)注:

· Apache ActiveMQ

· ZeroMQ

· RabbitMQ

Apache ActiveMQ 曝光率高,不過(guò)看起來(lái)它有些問(wèn)題,可能會(huì)造成丟消息。不可接受,下一個(gè)。

ZeroMQ 和 RabbitMQ 都支持一個(gè)開(kāi)源的消息協(xié)議,成為AMQP。AMQP的一個(gè)優(yōu)點(diǎn)是它是一個(gè)靈活和開(kāi)放的協(xié)議,以便和另外兩個(gè)商業(yè)化的Message Queue (IBM和Tibco)競(jìng)爭(zhēng),很好。不過(guò)ZeroMQ不支持消息持久化和崩潰恢復(fù),不太好。剩下的只有RabbitMQ了。如果你不在意消息持久化和崩潰恢復(fù),試試ZeroMQ吧,延遲很低,而且支持靈活的拓?fù)洹?/p>

剩下的只有這個(gè)吃胡蘿卜的家伙了……

當(dāng)我讀到它是用Erlang寫(xiě)的時(shí)候,RabbitMQ震了我一下。Erlang是愛(ài)立信開(kāi)發(fā)的高度并行的語(yǔ)言,用來(lái)跑在電話(huà)交換機(jī)上。是的,那些要求6個(gè)9的在線(xiàn)時(shí)間的東西。在Erlang當(dāng)中,充斥著大量輕量進(jìn)程,它們之間用消息傳遞來(lái)通信。聽(tīng)起來(lái)思路和我們用消息隊(duì)列的思路是一樣的,不是么?

而且,RabbitMQ支持持久化。是的,如果RabbitMQ死掉了,消息并不會(huì)丟失,當(dāng)隊(duì)列重啟,一切都會(huì)回來(lái)。而且,正如在DigiTar(注:原文作者的公司)做事情期望的那樣,它可以和Python無(wú)縫結(jié)合。除此之外,RabbitMQ的文檔相當(dāng)?shù)?hellip;…恐怖。如果你懂AMQP,這些文檔還好,但是有多少人懂AMQP?這些文檔就像MySQL的文檔假設(shè)你已經(jīng)懂了SQL一樣……不過(guò)沒(méi)關(guān)系啦。

好了,廢話(huà)少說(shuō)。這里是花了一周時(shí)間閱讀關(guān)于AMQP和關(guān)于它如何在RabbitMQ上工作的文檔之后的一個(gè)總結(jié),還有,怎么在Python當(dāng)中使用。

開(kāi)始吧

AMQP當(dāng)中有四個(gè)概念非常重要:虛擬主機(jī)(virtual host),交換機(jī)(exchange),隊(duì)列(queue)和綁定(binding)。一個(gè)虛擬主機(jī)持有一組交換機(jī)、隊(duì)列和綁定。為什么需要多個(gè)虛擬主機(jī)呢?很簡(jiǎn)單,RabbitMQ當(dāng)中,用戶(hù)只能在虛擬主機(jī)的粒度進(jìn)行權(quán)限控制。因此,如果需要禁止A組訪(fǎng)問(wèn)B組的交換機(jī)/隊(duì)列/綁定,必須為A和B分別創(chuàng)建一個(gè)虛擬主機(jī)。每一個(gè)RabbitMQ服務(wù)器都有一個(gè)默認(rèn)的虛擬主機(jī)“/”。如果這就夠了,那現(xiàn)在就可以開(kāi)始了。

交換機(jī),隊(duì)列,還有綁定……天哪!

剛開(kāi)始我思維的列車(chē)就是在這里脫軌的…… 這些鬼東西怎么結(jié)合起來(lái)的?

隊(duì)列(Queues)是你的消息(messages)的終點(diǎn),可以理解成裝消息的容器。消息就一直在里面,直到有客戶(hù)端(也就是消費(fèi)者,Consumer)連接到這個(gè)隊(duì)列并且將其取走為止。不過(guò)。你可以將一個(gè)隊(duì)列配置成這樣的:一旦消息進(jìn)入這個(gè)隊(duì)列,biu~,它就煙消云散了。這個(gè)有點(diǎn)跑題了……

需要記住的是,隊(duì)列是由消費(fèi)者(Consumer)通過(guò)程序建立的,不是通過(guò)配置文件或者命令行工具。這沒(méi)什么問(wèn)題,如果一個(gè)消費(fèi)者試圖創(chuàng)建一個(gè)已經(jīng)存在的隊(duì)列,RabbitMQ就會(huì)起來(lái)拍拍他的腦袋,笑一笑,然后忽略這個(gè)請(qǐng)求。因此你可以將消息隊(duì)列的配置寫(xiě)在應(yīng)用程序的代碼里面。這個(gè)概念不錯(cuò)。

OK,你已經(jīng)創(chuàng)建并且連接到了你的隊(duì)列,你的消費(fèi)者程序正在百無(wú)聊賴(lài)的敲著手指等待消息的到來(lái),敲啊,敲啊…… 沒(méi)有消息。發(fā)生了什么?你當(dāng)然需要先把一個(gè)消息放進(jìn)隊(duì)列才行。不過(guò)要做這個(gè),你需要一個(gè)交換機(jī)(Exchange)……

交換機(jī)可以理解成具有路由表的路由程序,僅此而已。每個(gè)消息都有一個(gè)稱(chēng)為路由鍵(routing key)的屬性,就是一個(gè)簡(jiǎn)單的字符串。交換機(jī)當(dāng)中有一系列的綁定(binding),即路由規(guī)則(routes),例如,指明具有路由鍵 “X” 的消息要到名為timbuku的隊(duì)列當(dāng)中去。先不討論這個(gè),我們有點(diǎn)超前了。

你的消費(fèi)者程序要負(fù)責(zé)創(chuàng)建你的交換機(jī)(復(fù)數(shù))。啥?你是說(shuō)你可以有多個(gè)交換機(jī)?是的,這個(gè)可以有,不過(guò)為啥?很簡(jiǎn)單,每個(gè)交換機(jī)在自己獨(dú)立的進(jìn)程當(dāng)中執(zhí)行,因此增加多個(gè)交換機(jī)就是增加多個(gè)進(jìn)程,可以充分利用服務(wù)器上的CPU核以便達(dá)到更高的效率。例如,在一個(gè)8核的服務(wù)器上,可以創(chuàng)建5個(gè)交換機(jī)來(lái)用5個(gè)核,另外3個(gè)核留下來(lái)做消息處理。類(lèi)似的,在RabbitMQ的集群當(dāng)中,你可以用類(lèi)似的思路來(lái)擴(kuò)展交換機(jī)一邊獲取更高的吞吐量。

OK,你已經(jīng)創(chuàng)建了一個(gè)交換機(jī)。但是他并不知道要把消息送到哪個(gè)隊(duì)列。你需要路由規(guī)則,即綁定(binding)。一個(gè)綁定就是一個(gè)類(lèi)似這樣的規(guī)則:將交換機(jī)“desert(沙漠)”當(dāng)中具有路由鍵“阿里巴巴”的消息送到隊(duì)列“hideout(山洞)”里面去。換句話(huà)說(shuō),一個(gè)綁定就是一個(gè)基于路由鍵將交換機(jī)和隊(duì)列連接起來(lái)的路由規(guī)則。例如,具有路由鍵“audit”的消息需要被送到兩個(gè)隊(duì)列,“log-forever”和“alert-the-big-dude”。要做到這個(gè),就需要?jiǎng)?chuàng)建兩個(gè)綁定,每個(gè)都連接一個(gè)交換機(jī)和一個(gè)隊(duì)列,兩者都是由“audit”路由鍵觸發(fā)。在這種情況下,交換機(jī)會(huì)復(fù)制一份消息并且把它們分別發(fā)送到兩個(gè)隊(duì)列當(dāng)中。交換機(jī)不過(guò)就是一個(gè)由綁定構(gòu)成的路由表。

現(xiàn)在復(fù)雜的東西來(lái)了:交換機(jī)有多種類(lèi)型。他們都是做路由的,不過(guò)接受不同類(lèi)型的綁定。為什么不創(chuàng)建一種交換機(jī)來(lái)處理所有類(lèi)型的路由規(guī)則呢?因?yàn)槊糠N規(guī)則用來(lái)做匹配分子的CPU開(kāi)銷(xiāo)是不同的。例如,一個(gè)“topic”類(lèi)型的交換機(jī)試圖將消息的路由鍵與類(lèi)似“dogs.*”的模式進(jìn)行匹配。匹配這種末端的通配符比直接將路由鍵與“dogs”比較(“direct”類(lèi)型的交換機(jī))要消耗更多的CPU。如果你不需要“topic”類(lèi)型的交換機(jī)帶來(lái)的靈活性,你可以通過(guò)使用“direct”類(lèi)型的交換機(jī)獲取更高的處理效率。那么有哪些類(lèi)型,他們又是怎么處理的呢?

Fanout Exchange– 不處理路由鍵。你只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上。一個(gè)發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。很像子網(wǎng)廣播,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的。

Direct Exchange– 處理路由鍵。需要將一個(gè)隊(duì)列綁定到交換機(jī)上,要求該消息與一個(gè)特定的路由鍵完全匹配。這是一個(gè)完整的匹配。如果一個(gè)隊(duì)列綁定到該交換機(jī)上要求路由鍵 “dog”,則只有被標(biāo)記為“dog”的消息才被轉(zhuǎn)發(fā),不會(huì)轉(zhuǎn)發(fā)dog.puppy,也不會(huì)轉(zhuǎn)發(fā)dog.guard,只會(huì)轉(zhuǎn)發(fā)dog。

Topic Exchange– 將路由鍵和某模式進(jìn)行匹配。此時(shí)隊(duì)列需要綁定要一個(gè)模式上。符號(hào)“#”匹配一個(gè)或多個(gè)詞,符號(hào)“*”匹配不多不少一個(gè)詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會(huì)匹配到“audit.irs”。我在RedHat的朋友做了一張不錯(cuò)的圖,來(lái)表明topic交換機(jī)是如何工作的:

Source:Red Hat Messaging Tutorial: 1.3 Topic Exchange

持久化這些小東西們

你花了大量的時(shí)間來(lái)創(chuàng)建隊(duì)列、交換機(jī)和綁定,然后,砰~服務(wù)器程序掛了。你的隊(duì)列、交換機(jī)和綁定怎么樣了?還有,放在隊(duì)列里面但是尚未處理的消息們呢?

放松~如果你是用默認(rèn)參數(shù)構(gòu)造的這一切的話(huà),那么,他們,都,biu~,灰飛煙滅了。是的,RabbitMQ重啟之后會(huì)干凈的像個(gè)新生兒。你必須重做所有的一切,亡羊補(bǔ)牢,如何避免將來(lái)再度發(fā)生此類(lèi)杯具?

隊(duì)列和交換機(jī)有一個(gè)創(chuàng)建時(shí)候指定的標(biāo)志durable,直譯叫做堅(jiān)固的。durable的唯一含義就是具有這個(gè)標(biāo)志的隊(duì)列和交換機(jī)會(huì)在重啟之后重新建立,它不表示說(shuō)在隊(duì)列當(dāng)中的消息會(huì)在重啟后恢復(fù)。那么如何才能做到不只是隊(duì)列和交換機(jī),還有消息都是持久的呢?

但是首先一個(gè)問(wèn)題是,你真的需要消息是持久的嗎?對(duì)于一個(gè)需要在重啟之后回復(fù)的消息來(lái)說(shuō),它需要被寫(xiě)入到磁盤(pán)上,而即使是最簡(jiǎn)單的磁盤(pán)操作也是要消耗時(shí)間的。如果和消息的內(nèi)容相比,你更看重的是消息處理的速度,那么不要使用持久化的消息。不過(guò)對(duì)于我們@DigiTar來(lái)說(shuō),持久化很重要。

當(dāng)你將消息發(fā)布到交換機(jī)的時(shí)候,可以指定一個(gè)標(biāo)志“Delivery Mode”(投遞模式)。根據(jù)你使用的AMQP的庫(kù)不同,指定這個(gè)標(biāo)志的方法可能不太一樣(我們后面會(huì)討論如何用Python搞定)。簡(jiǎn)單的說(shuō),就是將Delivery Mode設(shè)置成2,也就是持久的(persistent)即可。一般的AMQP庫(kù)都是將Delivery Mode設(shè)置成1,也就是非持久的。所以要持久化消息的步驟如下:

1. 將交換機(jī)設(shè)成 durable。

2. 將隊(duì)列設(shè)成 durable。

3. 將消息的 Delivery Mode 設(shè)置成2 。

就這樣,不是很復(fù)雜,起碼沒(méi)有造火箭復(fù)雜,不過(guò)也有可能犯點(diǎn)小錯(cuò)誤。

下面還要羅嗦一個(gè)東西……綁定(Bindings)怎么辦?我們無(wú)法在創(chuàng)建綁定的時(shí)候設(shè)置成durable。沒(méi)問(wèn)題,如果你綁定了一個(gè)durable的隊(duì)列和一個(gè)durable的交換機(jī),RabbitMQ會(huì)自動(dòng)保留這個(gè)綁定。類(lèi)似的,如果刪除了某個(gè)隊(duì)列或交換機(jī)(無(wú)論是不是durable),依賴(lài)它的綁定都會(huì)自動(dòng)刪除。

注意兩點(diǎn):

· RabbitMQ 不允許你綁定一個(gè)非堅(jiān)固(non-durable)的交換機(jī)和一個(gè)durable的隊(duì)列。反之亦然。要想成功必須隊(duì)列和交換機(jī)都是durable的。

· 一旦創(chuàng)建了隊(duì)列和交換機(jī),就不能修改其標(biāo)志了。例如,如果創(chuàng)建了一個(gè)non-durable的隊(duì)列,然后想把它改變成durable的,唯一的辦法就是刪除這個(gè)隊(duì)列然后重現(xiàn)創(chuàng)建。因此,最好仔細(xì)檢查創(chuàng)建的標(biāo)志。

開(kāi)始喂蛇了~

【譯注】說(shuō)喂蛇是因?yàn)镻ython的圖標(biāo)是條蛇。

AMQP的一個(gè)空白地帶是如何在Python當(dāng)中使用。對(duì)于其他語(yǔ)言有一大坨材料。

但是對(duì)Python老兄來(lái)說(shuō),你需要花點(diǎn)時(shí)間來(lái)挖掘一下。所以我寫(xiě)了這個(gè),這樣別的家伙們就不需要經(jīng)歷我這種抓狂的過(guò)程了。

首先,我們需要一個(gè)Python的AMQP庫(kù)。有兩個(gè)可選:

· py-amqplib– 通用的AMQP

· txAMQP– 使用Twisted框架的AMQP庫(kù),因此允許異步I/O。

根據(jù)你的需求,py-amqplib或者txAMQP都是可以的。因?yàn)槭腔赥wisted的,txAMQP可以保證用異步IO構(gòu)建超高性能的AMQP程序。但是Twisted編程本身就是一個(gè)很大的主題……因此清晰起見(jiàn),我們打算用 py-amqplib。更新:請(qǐng)參見(jiàn)Esteve Fernandez關(guān)于txAMQP的使用和代碼樣例的回復(fù)。

AMQP支持在一個(gè)TCP連接上啟用多個(gè)MQ通信channel,每個(gè)channel都可以被應(yīng)用作為通信流。每個(gè)AMQP程序至少要有一個(gè)連接和一個(gè)channel。

fromamqplibimportclient_0_8asamqp conn=amqp.Connection(host="localhost:5672",userid="guest", password="guest",virtual_host="/",insist=False) chan=conn.channel()

每個(gè)channel都被分配了一個(gè)整數(shù)標(biāo)識(shí),自動(dòng)由Connection()類(lèi)的.channel()方法維護(hù)?;蛘?,你可以使用.channel(x)來(lái)指定channel標(biāo)識(shí),其中x是你想要使用的channel標(biāo)識(shí)。通常情況下,推薦使用.channel()方法來(lái)自動(dòng)分配channel標(biāo)識(shí),以便防止沖突。

現(xiàn)在我們已經(jīng)有了一個(gè)可以用的連接和channel?,F(xiàn)在,我們的代碼將分成兩個(gè)應(yīng)用,生產(chǎn)者(producer)和消費(fèi)者(consumer)。我們先創(chuàng)建一個(gè)消費(fèi)者程序,他會(huì)創(chuàng)建一個(gè)叫做“po_box”的隊(duì)列和一個(gè)叫“sorting_room”的交換機(jī):

chan.queue_declare(queue="po_box",durable=True, exclusive=False,auto_delete=False) chan.exchange_declare(exchange="sorting_room",type="direct",durable=True, auto_delete=False,)

這段代碼干了啥?首先,它創(chuàng)建了一個(gè)名叫“po_box”的隊(duì)列,它是durable的(重啟之后會(huì)重新建立),并且最后一個(gè)消費(fèi)者斷開(kāi)的時(shí)候不會(huì)自動(dòng)刪除(auto_delete=False)。在創(chuàng)建durable的隊(duì)列(或者交換機(jī))的時(shí)候,將auto_delete設(shè)置成false是很重要的,否則隊(duì)列將會(huì)在最后一個(gè)消費(fèi)者斷開(kāi)的時(shí)候消失,與durable與否無(wú)關(guān)。如果將durable和auto_delete都設(shè)置成True,只有尚有消費(fèi)者活動(dòng)的隊(duì)列可以在RabbitMQ意外崩潰的時(shí)候自動(dòng)恢復(fù)。

(你可以注意到了另一個(gè)標(biāo)志,稱(chēng)為“exclusive”。如果設(shè)置成True,只有創(chuàng)建這個(gè)隊(duì)列的消費(fèi)者程序才允許連接到該隊(duì)列。這種隊(duì)列對(duì)于這個(gè)消費(fèi)者程序是私有的)。

還有另一個(gè)交換機(jī)聲明,創(chuàng)建了一個(gè)名字叫“sorting_room”的交換機(jī)。auto_delete和durable的含義和隊(duì)列是一樣的。但是,.excange_declare() 還有另外一個(gè)參數(shù)叫做type,用來(lái)指定要?jiǎng)?chuàng)建的交換機(jī)的類(lèi)型(如前面列出的):fanout,directtopic.

到此為止,你已經(jīng)有了一個(gè)可以接收消息的隊(duì)列和一個(gè)可以發(fā)送消息的交換機(jī)。不過(guò)我們需要?jiǎng)?chuàng)建一個(gè)綁定,把它們連接起來(lái)。

chan.queue_bind(queue=”po_box”,exchange=”sorting_room”, routing_key=”jason”)

這個(gè)綁定的過(guò)程非常直接。任何送到交換機(jī)“sorting_room”的具有路由鍵“jason” 的消息都被路由到名為“po_box” 的隊(duì)列。

現(xiàn)在,你有兩種方法從隊(duì)列當(dāng)中取出消息。第一個(gè)是調(diào)用chan.basic_get(),主動(dòng)從隊(duì)列當(dāng)中拉出下一個(gè)消息(如果隊(duì)列當(dāng)中沒(méi)有消息,chan.basic_get()會(huì)返回None, 因此下面代碼當(dāng)中print msg.body 會(huì)在沒(méi)有消息的時(shí)候崩掉):

msg=chan.basic_get("po_box") printmsg.body chan.basic_ack(msg.delivery_tag)

但是如果你想要應(yīng)用程序在消息到達(dá)的時(shí)候立即得到通知怎么辦?這種情況下不能使用chan.basic_get(),你需要用chan.basic_consume()注冊(cè)一個(gè)新消息到達(dá)的回調(diào)。

defrecv_callback(msg): print\'Received:\'+msg.body chan.basic_consume(queue=\'po_box\',no_ack=True, callback=recv_callback,consumer_tag="testtag") whileTrue: chan.wait() chan.basic_cancel("testtag")

chan.wait()放在一個(gè)無(wú)限循環(huán)里面,這個(gè)函數(shù)會(huì)等待在隊(duì)列上,直到下一個(gè)消息到達(dá)隊(duì)列。chan.basic_cancel()用來(lái)注銷(xiāo)該回調(diào)函數(shù)。參數(shù)consumer_tag當(dāng)中指定的字符串和chan.basic_consume()注冊(cè)的一直。在這個(gè)例子當(dāng)中chan.basic_cancel()不會(huì)被調(diào)用到,因?yàn)樯厦媸莻€(gè)無(wú)限循環(huán)…… 不過(guò)你需要知道這個(gè)調(diào)用,所以我把它放在了代碼里。

需要注意的另一個(gè)東西是no_ack參數(shù)。這個(gè)參數(shù)可以傳給chan.basic_get()chan.basic_consume(),默認(rèn)是false。當(dāng)從隊(duì)列當(dāng)中取出一個(gè)消息的時(shí)候,RabbitMQ需要應(yīng)用顯式地回饋說(shuō)已經(jīng)獲取到了該消息。如果一段時(shí)間內(nèi)不回饋,RabbitMQ會(huì)將該消息重新分配給另外一個(gè)綁定在該隊(duì)列上的消費(fèi)者。另一種情況是消費(fèi)者斷開(kāi)連接,但是獲取到的消息沒(méi)有回饋,則RabbitMQ同樣重新分配。如果將no_ack參數(shù)設(shè)置為true,則py-amqplib會(huì)為下一個(gè)AMQP請(qǐng)求添加一個(gè)no_ack屬性,告訴AMQP服務(wù)器不需要等待回饋。但是,大多數(shù)時(shí)候,你也許想要自己手工發(fā)送回饋,例如,需要在回饋之前將消息存入數(shù)據(jù)庫(kù)。回饋通常是通過(guò)調(diào)用chan.basic_ack()方法,使用消息的delivery_tag屬性作為參數(shù)。參見(jiàn)chan.basic_get()的實(shí)例代碼。

好了,這就是消費(fèi)者的全部代碼。

不過(guò)沒(méi)有人發(fā)送消息的話(huà),要消費(fèi)者何用?所以需要一個(gè)生產(chǎn)者。下面的代碼示例表明如何將一個(gè)簡(jiǎn)單消息發(fā)送到交換區(qū)“sorting_room”,并且標(biāo)記為路由鍵“jason” :

msg=amqp.Message("Testmessage!") msg.properties["delivery_mode"]=2 chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")

你也許注意到我們?cè)O(shè)置消息的delivery_mode屬性為2,因?yàn)殛?duì)列和交換機(jī)都設(shè)置為durable的,這個(gè)設(shè)置將保證消息能夠持久化,也就是說(shuō),當(dāng)它還沒(méi)有送達(dá)消費(fèi)者之前如果RabbitMQ重啟則它能夠被恢復(fù)。

剩下的最后一件事情(生產(chǎn)者和消費(fèi)者都需要調(diào)用的)是關(guān)閉channel和連接:

chan.close() conn.close()

很簡(jiǎn)單吧。

現(xiàn)在我們已經(jīng)寫(xiě)好了生產(chǎn)者和消費(fèi)者,讓他們跑起來(lái)吧。假設(shè)你的RabbitMQ在localhost上安裝并且運(yùn)行。

打開(kāi)一個(gè)終端,執(zhí)行python ./amqp_consumer.py讓消費(fèi)者運(yùn)行,并且創(chuàng)建隊(duì)列、交換機(jī)和綁定。

然后在另一個(gè)終端運(yùn)行python ./amqp_publisher.py “AMQP rocks.”。如果一切良好,你應(yīng)該能夠在第一個(gè)終端看到輸出的消息。

付諸使用吧

我知道這個(gè)教程是非常粗淺的關(guān)于AMQP/RabbitMQ和如何使用Python訪(fǎng)問(wèn)的教程。希望這個(gè)可以說(shuō)明所有的概念如何在Python當(dāng)中被組合起來(lái)。同時(shí),我很高興回答我知道的問(wèn)題?!咀g注:譯者也是一樣的】。接下來(lái)是,集群化(clustering)!不過(guò)我需要先把它弄懂再說(shuō)。

文章名稱(chēng):[RabbitMQ+Python入門(mén)經(jīng)典]兔子和兔子窩
標(biāo)題網(wǎng)址:http://muchs.cn/article48/chedhp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站改版、電子商務(wù)、App開(kāi)發(fā)動(dòng)態(tài)網(wǎng)站、網(wǎng)站策劃、網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)