RabbitMQ延遲隊(duì)列怎么利用Python實(shí)現(xiàn)-創(chuàng)新互聯(lián)

本篇文章為大家展示了RabbitMQ延遲隊(duì)列怎么利用Python實(shí)現(xiàn),內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。

創(chuàng)新互聯(lián)基于成都重慶香港及美國(guó)等地區(qū)分布式IDC機(jī)房數(shù)據(jù)中心構(gòu)建的電信大帶寬,聯(lián)通大帶寬,移動(dòng)大帶寬,多線BGP大帶寬租用,是為眾多客戶提供專業(yè)服務(wù)器托管報(bào)價(jià),主機(jī)托管價(jià)格性價(jià)比高,為金融證券行業(yè)成都天府聯(lián)通服務(wù)器托管,ai人工智能服務(wù)器托管提供bgp線路100M獨(dú)享,G口帶寬及機(jī)柜租用的專業(yè)成都idc公司。

延遲隊(duì)列的基礎(chǔ)原理Time To Live(TTL)


RabbitMQ可以針對(duì)Queue設(shè)置x-expires 或者 針對(duì)Message設(shè)置 x-message-ttl,來控制消息的生存時(shí)間,如果超時(shí)(兩者同時(shí)設(shè)置以最先到期的時(shí)間為準(zhǔn)),則消息變?yōu)閐ead letter(死信)
RabbitMQ消息的過期時(shí)間有兩種方法設(shè)置。

通過隊(duì)列(Queue)的屬性設(shè)置,隊(duì)列中所有的消息都有相同的過期時(shí)間。(本次延遲隊(duì)列采用的方案)對(duì)消息單獨(dú)設(shè)置,每條消息TTL可以不同。

如果同時(shí)使用,則消息的過期時(shí)間以兩者之間TTL較小的那個(gè)數(shù)值為準(zhǔn)。消息在隊(duì)列的生存時(shí)間一旦超過設(shè)置的TTL值,就成為死信(dead letter)

Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個(gè)參數(shù),如果隊(duì)列內(nèi)出現(xiàn)了dead letter,則按照這兩個(gè)參數(shù)重新路由轉(zhuǎn)發(fā)到指定的隊(duì)列。

  • x-dead-letter-exchange:出現(xiàn)死信(dead letter)之后將dead letter重新發(fā)送到指定exchange

  • x-dead-letter-routing-key:出現(xiàn)死信(dead letter)之后將dead letter重新按照指定的routing-key發(fā)送

隊(duì)列中出現(xiàn)死信(dead letter)的情況有:

  • 消息或者隊(duì)列的TTL過期。(延遲隊(duì)列利用的特性)

  • 隊(duì)列達(dá)到較大長(zhǎng)度

  • 消息被消費(fèi)端拒絕(basic.reject or basic.nack)并且requeue=false

綜合上面兩個(gè)特性,將隊(duì)列設(shè)置TTL規(guī)則,隊(duì)列TTL過期后消息會(huì)變成死信,然后利用DLX特性將其轉(zhuǎn)發(fā)到另外的交換機(jī)和隊(duì)列就可以被重新消費(fèi),達(dá)到延遲消費(fèi)效果。

RabbitMQ延遲隊(duì)列怎么利用Python實(shí)現(xiàn)

延遲隊(duì)列設(shè)計(jì)及實(shí)現(xiàn)(Python)

從上面描述,延遲隊(duì)列的實(shí)現(xiàn)大致分為兩步:

產(chǎn)生死信,有兩種方式Per-Message TTL和 Queue TTL,因?yàn)槲业男枨笾惺撬械南⒀舆t處理時(shí)間相同,所以本實(shí)現(xiàn)中采用 Queue TTL設(shè)置隊(duì)列的TTL,如果需要將隊(duì)列中的消息設(shè)置不同的延遲處理時(shí)間,則設(shè)置Per-Message TTL(官方文檔)

設(shè)置死信的轉(zhuǎn)發(fā)規(guī)則,Dead Letter Exchanges設(shè)置方法(官方文檔)


完整代碼如下:

"""
Created on Fri Aug 3 17:00:44 2018

@author: Bge
"""
import pika,json,logging
class RabbitMQClient:
  def __init__(self, conn_str='amqp://user:pwd@host:port/%2F'):
    self.exchange_type = "direct"
    self.connection_string = conn_str
    self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string))
    self.channel = self.connection.channel()
    self._declare_retry_queue() #RetryQueue and RetryExchange
    logging.debug("connection established")
  def close_connection(self):
    self.connection.close()
    logging.debug("connection closed")
  def declare_exchange(self, exchange):
    self.channel.exchange_declare(exchange=exchange,
                   exchange_type=self.exchange_type,
                   durable=True)
  def declare_queue(self, queue):
    self.channel.queue_declare(queue=queue,
                  durable=True,)
  def declare_delay_queue(self, queue,DLX='RetryExchange',TTL=60000):
    """
    創(chuàng)建延遲隊(duì)列
    :param TTL: ttl的單位是us,ttl=60000 表示 60s
    :param queue:
    :param DLX:死信轉(zhuǎn)發(fā)的exchange
    :return:
    """
    arguments={}
    if DLX:
      #設(shè)置死信轉(zhuǎn)發(fā)的exchange
      arguments[ 'x-dead-letter-exchange']=DLX
    if TTL:
      arguments['x-message-ttl']=TTL
    print(arguments)
    self.channel.queue_declare(queue=queue,
                  durable=True,
                  arguments=arguments)
  def _declare_retry_queue(self):
    """
    創(chuàng)建異常交換器和隊(duì)列,用于存放沒有正常處理的消息。
    :return:
    """
    self.channel.exchange_declare(exchange='RetryExchange',
                   exchange_type='fanout',
                   durable=True)
    self.channel.queue_declare(queue='RetryQueue',
                  durable=True)
    self.channel.queue_bind('RetryQueue', 'RetryExchange','RetryQueue')
  def publish_message(self,routing_key, msg,exchange='',delay=0,TTL=None):
    """
    發(fā)送消息到指定的交換器
    :param exchange: RabbitMQ交換器
    :param msg: 消息實(shí)體,是一個(gè)序列化的JSON字符串
    :return:
    """
    if delay==0:
      self.declare_queue(routing_key)
    else:
      self.declare_delay_queue(routing_key,TTL=TTL)
    if exchange!='':
      self.declare_exchange(exchange)
    self.channel.basic_publish(exchange=exchange,
                  routing_key=routing_key,
                  body=msg,
                  properties=pika.BasicProperties(
                    delivery_mode=2,
                    type=exchange
                  ))
    self.close_connection()
    print("message send out to %s" % exchange)
    logging.debug("message send out to %s" % exchange)
  def start_consume(self,callback,queue='#',delay=1):
    """
    啟動(dòng)消費(fèi)者,開始消費(fèi)RabbitMQ中的消息
    :return:
    """
    if delay==1:
      queue='RetryQueue'
    else:
      self.declare_queue(queue)
    self.channel.basic_qos(prefetch_count=1)
    try:
      self.channel.basic_consume( # 消費(fèi)消息
        callback, # 如果收到消息,就調(diào)用callback函數(shù)來處理消息
        queue=queue, # 你要從那個(gè)隊(duì)列里收消息
      )
      self.channel.start_consuming()
    except KeyboardInterrupt:
      self.stop_consuming()
  def stop_consuming(self):
    self.channel.stop_consuming()
    self.close_connection()
  def message_handle_successfully(channel, method):
    """
    如果消息處理正常完成,必須調(diào)用此方法,
    否則RabbitMQ會(huì)認(rèn)為消息處理不成功,重新將消息放回待執(zhí)行隊(duì)列中
    :param channel: 回調(diào)函數(shù)的channel參數(shù)
    :param method: 回調(diào)函數(shù)的method參數(shù)
    :return:
    """
    channel.basic_ack(delivery_tag=method.delivery_tag)
  def message_handle_failed(channel, method):
    """
    如果消息處理失敗,應(yīng)該調(diào)用此方法,會(huì)自動(dòng)將消息放入異常隊(duì)列
    :param channel: 回調(diào)函數(shù)的channel參數(shù)
    :param method: 回調(diào)函數(shù)的method參數(shù)
    :return:
    """
    channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

發(fā)布消息代碼如下:

from MQ.RabbitMQ import RabbitMQClient
print("start program")
client = RabbitMQClient()
msg1 = '{"key":"value"}'
client.publish_message('test-delay',msg1,delay=1,TTL=10000)
print("message send out")

消費(fèi)者代碼如下:

from MQ.RabbitMQ import RabbitMQClient
import json
print("start program")
client = RabbitMQClient()
def callback(ch, method, properties, body):
    msg = body.decode()
    print(msg)
    # 如果處理成功,則調(diào)用此消息回復(fù)ack,表示消息成功處理完成。
    RabbitMQClient.message_handle_successfully(ch, method)
queue_name = "RetryQueue"
client.start_consume(callback,queue_name,delay=0)

上述內(nèi)容就是RabbitMQ延遲隊(duì)列怎么利用Python實(shí)現(xiàn),你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

網(wǎng)站欄目:RabbitMQ延遲隊(duì)列怎么利用Python實(shí)現(xiàn)-創(chuàng)新互聯(lián)
文章分享:http://muchs.cn/article14/dpecge.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站建設(shè)Google、網(wǎng)站營(yíng)銷虛擬主機(jī)、做網(wǎng)站網(wǎng)站收錄

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)