rabbitmq的三種隊(duì)列以及使用方式(beego)
創(chuàng)新互聯(lián)建站堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:做網(wǎng)站、網(wǎng)站設(shè)計(jì)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的博白網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!提示:最上面右調(diào)用的統(tǒng)一調(diào)用,最下面有消費(fèi)的代碼注:消費(fèi)需根據(jù)條件修改并另起一個(gè)mian.go
面試官問(wèn)這個(gè)問(wèn)題,肯定是想知道你們公司有一個(gè)什么場(chǎng)景需要使用到這個(gè)Mq,這個(gè)場(chǎng)景有一個(gè)什么技術(shù)挑戰(zhàn)導(dǎo)致必須要用這個(gè)mq,用了這個(gè)mq之后有什么好處。mq經(jīng)典的使用場(chǎng)景有解耦,異步,削鋒。
而rabbitmq是如何進(jìn)行使用的他的使用發(fā)放是什么呢?
提示:先是統(tǒng)一的引入跟實(shí)例化
import (
"bytes"
"fmt"
"github.com/streadway/amqp"
)
type Callback func(str1 string)
//Connect RabbitMQ連接函數(shù)
func Connect() (conn *amqp.Connection, err error) {//連接mq
conn, err = amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
return conn, err
}
func BytesToString(b *[]byte) *string {s := bytes.NewBuffer(*b)
r := s.String()
return &r
}
一、rabbitmq的普通隊(duì)列(路由模式)
1.生成者示例:這是最普通的rabbitmq的生成者
//Publish 發(fā)送端函數(shù)
//exchange交換機(jī)名稱
//queueName隊(duì)列名稱
//body發(fā)送內(nèi)容
func Publish(exchange string, queueName string, body string) error {//建立連接
conn, err := Connect()
if err != nil {return err
}
defer conn.Close()
//創(chuàng)建通道channel
channel, err := conn.Channel()
if err != nil {return err
}
defer channel.Close()
//創(chuàng)建隊(duì)列
q, err := channel.QueueDeclare(
queueName, //隊(duì)列名稱
true, //持久化
false,
false,
false,
nil,
)
if err != nil {return err
}
//發(fā)送消息
err = channel.Publish(exchange, q.Name, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
return err
}
2.消費(fèi)者實(shí)例:這是普通隊(duì)列的消費(fèi)
//Consumer 接受方法
func Consumer(exchange string, queueName string, callback Callback) {//建立連接
conn, err := Connect()
defer conn.Close()
if err != nil {fmt.Println(err)
return
}
//創(chuàng)建通道channel
channel, err := conn.Channel()
defer channel.Close()
if err != nil {fmt.Println(err)
return
}
//創(chuàng)建queue
q, err := channel.QueueDeclare(
queueName,
true,
false,
false,
false,
nil,
)
if err != nil {fmt.Println(err)
return
}
//輸出
msgs, err := channel.Consume(
q.Name,
"",
false, //手動(dòng)應(yīng)答
false,
false,
false,
nil,
)
if err != nil {fmt.Println(err)
return
}
forever := make(chan bool)
go func() {for d := range msgs { s := BytesToString(&(d.Body))
callback(*s)
d.Ack(false)
}
}()
fmt.Printf("Waiting for messages")
<-forever
}
func BytesToString(b *[]byte) *string {s := bytes.NewBuffer(*b)
r := s.String()
return &r
}
func callback(s string) {fmt.Printf("msg:%s", s)
return
}
二、rabbitmq的并發(fā)隊(duì)列(主題模式)
1.生產(chǎn)者代碼如下(示例):
func PublishEx(exchange string, types string, routingKey string, body string) error {//建立連接
conn, err := Connect()
defer conn.Close()
if err != nil {return err
}
//創(chuàng)建channel
channel, err := conn.Channel()
defer channel.Close()
if err != nil {return err
}
//創(chuàng)建交換機(jī)
err = channel.ExchangeDeclare(
exchange,
types,
true,
false,
false,
false,
nil,
)
if err != nil {return err
}
err = channel.Publish(exchange, routingKey, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
return err
}
2.消費(fèi)者代碼如下(示例):
func ConsumerEx(exchange string, types string, routingKey string, callback Callback) {//建立連接
conn, err := Connect()
defer conn.Close()
if err != nil {fmt.Println(err)
return
}
//創(chuàng)建通道channel
channel, err := conn.Channel()
defer channel.Close()
if err != nil {fmt.Println(err)
return
}
//創(chuàng)建交換機(jī)
err = channel.ExchangeDeclare(
exchange,
types,
true,
false,
false,
false,
nil,
)
if err != nil {fmt.Println(err)
return
}
//創(chuàng)建隊(duì)列
q, err := channel.QueueDeclare(
"",
false,
false,
true,
false,
nil,
)
if err != nil {fmt.Println(err)
return
}
//綁定
err = channel.QueueBind(
q.Name,
routingKey,
exchange,
false,
nil,
)
if err != nil {fmt.Println(err)
return
}
msgs, err := channel.Consume(q.Name, "", false, false, false, false, nil)
if err != nil {fmt.Println(err)
return
}
forever := make(chan bool)
go func() {for d := range msgs { s := BytesToString(&(d.Body))
callback(*s)
d.Ack(false)
}
}()
fmt.Printf("Waiting for messages\n")
<-forever
}
三、rabbitmq的雙隊(duì)列(死信隊(duì)列)
1.生產(chǎn)者代碼如下(示例):
func PublishDlx(exchangeA string, body string) error {//建立連接
conn, err := Connect()
if err != nil {return err
}
defer conn.Close()
//創(chuàng)建一個(gè)Channel
channel, err := conn.Channel()
if err != nil {return err
}
defer channel.Close()
//消息發(fā)送到A交換機(jī)
err = channel.Publish(exchangeA, "", false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
return err
}
2.消費(fèi)者代碼如下(示例):
func ConsumerDlx(exchangeA string, queueAName string, exchangeB string, queueBName string, ttl int, callback Callback) {//建立連接
conn, err := Connect()
if err != nil {fmt.Println(err)
return
}
defer conn.Close()
//創(chuàng)建一個(gè)Channel
channel, err := conn.Channel()
if err != nil {fmt.Println(err)
return
}
defer channel.Close()
//創(chuàng)建A交換機(jī)
//創(chuàng)建A隊(duì)列
//A交換機(jī)和A隊(duì)列綁定
err = channel.ExchangeDeclare(
exchangeA, // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {fmt.Println(err)
return
}
//創(chuàng)建一個(gè)queue,指定消息過(guò)期時(shí)間,并且綁定過(guò)期以后發(fā)送到那個(gè)交換機(jī)
queueA, err := channel.QueueDeclare(
queueAName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
amqp.Table{ // 當(dāng)消息過(guò)期時(shí)把消息發(fā)送到 exchangeB
"x-dead-letter-exchange": exchangeB,
"x-message-ttl": ttl,
//"x-dead-letter-queue" : queueBName,
//"x-dead-letter-routing-key" :
},
)
if err != nil {fmt.Println(err)
return
}
//A交換機(jī)和A隊(duì)列綁定
err = channel.QueueBind(
queueA.Name, // queue name
"", // routing key
exchangeA, // exchange
false,
nil,
)
if err != nil {fmt.Println(err)
return
}
//創(chuàng)建B交換機(jī)
//創(chuàng)建B隊(duì)列
//B交換機(jī)和B隊(duì)列綁定
err = channel.ExchangeDeclare(
exchangeB, // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {fmt.Println(err)
return
}
//創(chuàng)建一個(gè)queue
queueB, err := channel.QueueDeclare(
queueBName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {fmt.Println(err)
return
}
//B交換機(jī)和B隊(duì)列綁定
err = channel.QueueBind(
queueB.Name, // queue name
"", // routing key
exchangeB, // exchange
false,
nil,
)
if err != nil {fmt.Println(err)
return
}
msgs, err := channel.Consume(queueB.Name, "", false, false, false, false, nil)
if err != nil {fmt.Println(err)
return
}
forever := make(chan bool)
go func() {for d := range msgs { s := BytesToString(&(d.Body))
callback(*s)
d.Ack(false)
}
}()
fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
<-forever
}
剩下的統(tǒng)一調(diào)用消費(fèi)者模板
package main
import (
"encoding/json"
"fmt"
"github.com/beego/beego/v2/client/orm"
beego "github.com/beego/beego/v2/server/web"
"github.com/garyburd/redigo/redis"
"goApi/models"
_ "goApi/routers"
redisClient "goApi/services"
"goApi/services/mq"
"strconv"
)
func main() {beego.LoadAppConfig("ini", "../../conf/app.conf")
//err := orm.RegisterDataBase("default", "mysql", "fukw:ipx4JtpXR6sCxmKt@tcp(127.0.0.1)/fukw?charset=utf8")
err := orm.RegisterDataBase("default", "mysql", "root:root@tcp(127.0.0.1)/fukw?charset=utf8")
if err != nil { fmt.Println("連接數(shù)據(jù)庫(kù)失敗")
}
c, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil { fmt.Println("redis連接失敗")
}
defer c.Close()
mq.Consumer("", "fyouku_top", callback)
fmt.Printf("mian執(zhí)行成功")
beego.Run()
}
func callback(s string) {type Data struct { VideoId int
}
var data Data
err := json.Unmarshal([]byte(s), &data)
videoInfo, err := models.GetVideoInfo(data.VideoId)
if err == nil { conn := redisClient.RedisConnect()
defer conn.Close()
//更新排行榜
//執(zhí)行的代碼我這里是排行榜
redisChannelKey := "video:top:channel:channelId:" + strconv.Itoa(videoInfo.ChannelId)
redisTypeKey := "video:top:type:typeId:" + strconv.Itoa(videoInfo.TypeId)
conn.Do("zincrby", redisChannelKey, 1, data.VideoId)
conn.Do("zincrby", redisTypeKey, 1, data.VideoId)
}
fmt.Printf("msg is :%s\n", s)
}
這就是rabbitmq的3種隊(duì)列的的書寫形式
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧
當(dāng)前名稱:rabbitmq的三種隊(duì)列以及使用方式(go)-創(chuàng)新互聯(lián)
文章源于:http://muchs.cn/article18/csjigp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制開發(fā)、網(wǎng)站改版、全網(wǎng)營(yíng)銷推廣、網(wǎng)站制作、網(wǎng)站內(nèi)鏈、品牌網(wǎng)站設(shè)計(jì)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容