go語言同步機(jī)制有哪些 go語言通道

Golang入門到項(xiàng)目實(shí)戰(zhàn) | golang并發(fā)變成之通道channel

Go提供了一種稱為通道的機(jī)制,用于在goroutine之間共享數(shù)據(jù)。當(dāng)您作為goroutine執(zhí)行并發(fā)活動時(shí),需要在goroutine之間共享資源或數(shù)據(jù),通道充當(dāng)goroutine之間的管道(管道)并提供一種機(jī)制來保證同步交換。

張家川回族自治ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場景,ssl證書未來市場廣闊!成為成都創(chuàng)新互聯(lián)公司的ssl證書銷售渠道,可以享受市場價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:028-86922220(備注:SSL證書合作)期待與您的合作!

根據(jù)數(shù)據(jù)交換的行為,有兩種類型的通道:無緩沖通道和緩沖通道。無緩沖通道用于執(zhí)行g(shù)oroutine之間的同步通信,而緩沖通道用于執(zhí)行異步通信。無緩沖通道保證在發(fā)送和接收發(fā)生的瞬間兩個(gè)goroutine之間的交換。緩沖通道沒有這樣的保證。

通道由make函數(shù)創(chuàng)建,該函數(shù)指定chan關(guān)鍵字和通道的元素類型。

這是創(chuàng)建無緩沖和緩沖通道的代碼塊:

語法

使用內(nèi)置函數(shù)make創(chuàng)建無緩沖和緩沖通道。make的第一個(gè)參數(shù)需要關(guān)鍵字chan,然后是通道允許交換的數(shù)據(jù)類型。

這是將值發(fā)送到通道的代碼塊需要使用-運(yùn)算符:

語法

一個(gè)包含5個(gè)值的緩沖區(qū)的字符串類型的goroutine1通道。然后我們通過通道發(fā)送字符串“Australia”。

這是從通道接收值的代碼塊:

語法

- 運(yùn)算符附加到通道變量(goroutine1)的左側(cè),以接收來自通道的值。

在無緩沖通道中,在接收到任何值之前沒有能力保存它。在這種類型的通道中,發(fā)送和接收goroutine在任何發(fā)送或接收操作完成之前的同一時(shí)刻都準(zhǔn)備就緒。如果兩個(gè)goroutine沒有在同一時(shí)刻準(zhǔn)備好,則通道會讓執(zhí)行其各自發(fā)送或接收操作的goroutine首先等待。同步是通道上發(fā)送和接收之間交互的基礎(chǔ)。沒有另一個(gè)就不可能發(fā)生。

在緩沖通道中,有能力在接收到一個(gè)或多個(gè)值之前保存它們。在這種類型的通道中,不要強(qiáng)制goroutine在同一時(shí)刻準(zhǔn)備好執(zhí)行發(fā)送和接收。當(dāng)發(fā)送和接收阻塞時(shí)也有不同的條件。只有當(dāng)通道中沒有要接收的值時(shí),接收才會阻塞。僅當(dāng)沒有可用緩沖區(qū)來放置正在發(fā)送的值時(shí),發(fā)送才會阻塞。

實(shí)例

運(yùn)行結(jié)果

go語言無緩沖的channel

無緩沖的通道(unbuffered channel)是指在接收前沒有能力保存任何值的通道。

這種類型的通道要求發(fā)送goroutine和接收goroutine同時(shí)準(zhǔn)備好,才能完成發(fā)送和接收操作。否則,通道會導(dǎo)致先執(zhí)行發(fā)送或接收操作的 goroutine 阻塞等待。

這種對通道進(jìn)行發(fā)送和接收的交互行為本身就是同步的。其中任意一個(gè)操作都無法離開另一個(gè)操作單獨(dú)存在。

阻塞:由于某種原因數(shù)據(jù)沒有到達(dá),當(dāng)前協(xié)程(線程)持續(xù)處于等待狀態(tài),直到條件滿足,才接觸阻塞。

同步:在兩個(gè)或多個(gè)協(xié)程(線程)間,保持?jǐn)?shù)據(jù)內(nèi)容一致性的機(jī)制。

下圖展示兩個(gè) goroutine 如何利用無緩沖的通道來共享一個(gè)值:

在第 1 步,兩個(gè) goroutine 都到達(dá)通道,但哪個(gè)都沒有開始執(zhí)行發(fā)送或者接收。

在第 2 步,左側(cè)的 goroutine 將它的手伸進(jìn)了通道,這模擬了向通道發(fā)送數(shù)據(jù)的行為。這時(shí),這個(gè) goroutine 會在通道中被鎖住,直到交換完成。

在第 3 步,右側(cè)的 goroutine 將它的手放入通道,這模擬了從通道里接收數(shù)據(jù)。這個(gè) goroutine 一樣也會在通道中被鎖住,直到交換完成。

在第 4 步和第 5 步,進(jìn)行交換,并最終,在第 6 步,兩個(gè) goroutine 都將它們的手從通道里拿出來,這模擬了被鎖住的 goroutine 得到釋放。兩個(gè) goroutine 現(xiàn)在都可以去做別的事情了。

如果沒有指定緩沖區(qū)容量,那么該通道就是同步的,因此會阻塞到發(fā)送者準(zhǔn)備好發(fā)送和接收者準(zhǔn)備好接收。

無緩沖channel: —— 同步通信

Golang kafka簡述和操作(sarama同步異步和消費(fèi)組)

一、Kafka簡述

1. 為什么需要用到消息隊(duì)列

異步:對比以前的串行同步方式來說,可以在同一時(shí)間做更多的事情,提高效率;

解耦:在耦合太高的場景,多個(gè)任務(wù)要對同一個(gè)數(shù)據(jù)進(jìn)行操作消費(fèi)的時(shí)候,會導(dǎo)致一個(gè)任務(wù)的處理因?yàn)榱硪粋€(gè)任務(wù)對數(shù)據(jù)的操作變得及其復(fù)雜。

緩沖:當(dāng)遇到突發(fā)大流量的時(shí)候,消息隊(duì)列可以先把所有消息有序保存起來,避免直接作用于系統(tǒng)主體,系統(tǒng)主題始終以一個(gè)平穩(wěn)的速率去消費(fèi)這些消息。

2.為什么選擇kafka呢?

這沒有絕對的好壞,看個(gè)人需求來選擇,我這里就抄了一段他人總結(jié)的的優(yōu)缺點(diǎn),可見原文

kafka的優(yōu)點(diǎn):

1.支持多個(gè)生產(chǎn)者和消費(fèi)者2.支持broker的橫向拓展3.副本集機(jī)制,實(shí)現(xiàn)數(shù)據(jù)冗余,保證數(shù)據(jù)不丟失4.通過topic將數(shù)據(jù)進(jìn)行分類5.通過分批發(fā)送壓縮數(shù)據(jù)的方式,減少數(shù)據(jù)傳輸開銷,提高吞高量6.支持多種模式的消息7.基于磁盤實(shí)現(xiàn)數(shù)據(jù)的持久化8.高性能的處理信息,在大數(shù)據(jù)的情況下,可以保證亞秒級的消息延遲9.一個(gè)消費(fèi)者可以支持多種topic的消息10.對CPU和內(nèi)存的消耗比較小11.對網(wǎng)絡(luò)開銷也比較小12.支持跨數(shù)據(jù)中心的數(shù)據(jù)復(fù)制13.支持鏡像集群

kafka的缺點(diǎn):

1.由于是批量發(fā)送,所以數(shù)據(jù)達(dá)不到真正的實(shí)時(shí)2.對于mqtt協(xié)議不支持3.不支持物聯(lián)網(wǎng)傳感數(shù)據(jù)直接接入4.只能支持統(tǒng)一分區(qū)內(nèi)消息有序,無法實(shí)現(xiàn)全局消息有序5.監(jiān)控不完善,需要安裝插件6.需要配合zookeeper進(jìn)行元數(shù)據(jù)管理7.會丟失數(shù)據(jù),并且不支持事務(wù)8.可能會重復(fù)消費(fèi)數(shù)據(jù),消息會亂序,可用保證一個(gè)固定的partition內(nèi)部的消息是有序的,但是一個(gè)topic有多個(gè)partition的話,就不能保證有序了,需要zookeeper的支持,topic一般需要人工創(chuàng)建,部署和維護(hù)一般都比mq高

3. Golang 操作kafka

3.1. kafka的環(huán)境

網(wǎng)上有很多搭建kafka環(huán)境教程,這里就不再搭建,就展示一下kafka的環(huán)境,在kubernetes上進(jìn)行的搭建,有需要的私我,可以發(fā)yaml文件

3.2. 第三方庫

github.com/Shopify/sarama // kafka主要的庫*github.com/bsm/sarama-cluster // kafka消費(fèi)組

3.3. 消費(fèi)者

單個(gè)消費(fèi)者

funcconsumer(){varwg sync.WaitGroup? consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{? ? ? fmt.Println("Failed to start consumer: %s", err)return}? partitionList, err := consumer.Partitions("test0")//獲得該topic所有的分區(qū)iferr !=nil{? ? ? fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {? ? ? pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{? ? ? ? fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}? ? ? wg.Add(1)gofunc(sarama.PartitionConsumer){//為每個(gè)分區(qū)開一個(gè)go協(xié)程去取值formsg :=rangepc.Messages() {//阻塞直到有值發(fā)送過來,然后再繼續(xù)等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))? ? ? ? }deferpc.AsyncClose()? ? ? ? wg.Done()? ? ? }(pc)? }? wg.Wait()}funcmain(){? consumer()}

消費(fèi)組

funcconsumerCluster(){? groupID :="group-1"config := cluster.NewConfig()? config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second? config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始從最新的offset開始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{? ? ? glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){? ? ? errors := c.Errors()? ? ? noti := c.Notifications()for{select{caseerr := -errors:? ? ? ? ? ? glog.Errorln(err)case-noti:? ? ? ? }? ? ? }? }(c)formsg :=rangec.Messages() {? ? ? fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))? ? ? c.MarkOffset(msg,"")//MarkOffset 并不是實(shí)時(shí)寫入kafka,有可能在程序crash時(shí)丟掉未提交的offset}}funcmain(){goconsumerCluster()}

3.4. 生產(chǎn)者

同步生產(chǎn)者

packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){? config := sarama.NewConfig()? config.Producer.RequiredAcks = sarama.WaitForAll//賦值為-1:這意味著producer在follower副本確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//寫到隨機(jī)分區(qū)中,默認(rèn)設(shè)置8個(gè)分區(qū)config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}? msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!")? client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{? ? ? fmt.Println("producer close err, ", err)return}deferclient.Close()? pid, offset, err := client.SendMessage(msg)iferr !=nil{? ? ? fmt.Println("send message failed, ", err)return}? fmt.Printf("分區(qū)ID:%v, offset:%v \n", pid, offset)}

異步生產(chǎn)者

funcasyncProducer(){? config := sarama.NewConfig()? config.Producer.Return.Successes =true//必須有這個(gè)選項(xiàng)config.Producer.Timeout =5* time.Second? p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//這個(gè)部分一定要寫,不然通道會被堵塞gofunc(p sarama.AsyncProducer){? ? ? errors := p.Errors()? ? ? success := p.Successes()for{select{caseerr := -errors:iferr !=nil{? ? ? ? ? ? ? glog.Errorln(err)? ? ? ? ? ? }case-success:? ? ? ? }? ? ? }? }(p)for{? ? ? v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))? ? ? fmt.Fprintln(os.Stdout, v)? ? ? msg := sarama.ProducerMessage{? ? ? ? Topic: topics,? ? ? ? Value: sarama.ByteEncoder(v),? ? ? }? ? ? p.Input() - msg? ? ? time.Sleep(time.Second *1)? }}funcmain(){goasyncProducer()select{? ? ? }}

3.5. 結(jié)果展示-

同步生產(chǎn)打印:

分區(qū)ID:0,offset:90

消費(fèi)打?。?/p>

Partition:0,Offset:90,key:,value:Hello World!

異步生產(chǎn)打?。?/p>

async:7272async:7616async:998

消費(fèi)打?。?/p>

Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998

文章題目:go語言同步機(jī)制有哪些 go語言通道
轉(zhuǎn)載來于:http://muchs.cn/article24/dohioce.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站、App設(shè)計(jì)、小程序開發(fā)網(wǎng)站設(shè)計(jì)公司、網(wǎng)站內(nèi)鏈、Google

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都app開發(fā)公司