Python協(xié)程是怎么實現(xiàn)的

這篇“Python協(xié)程是怎么實現(xiàn)的”文章的知識點大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Python協(xié)程是怎么實現(xiàn)的”文章吧。

在彭陽等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供網(wǎng)站建設(shè)、網(wǎng)站制作 網(wǎng)站設(shè)計制作按需開發(fā)網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站制作,全網(wǎng)營銷推廣,成都外貿(mào)網(wǎng)站建設(shè)公司,彭陽網(wǎng)站建設(shè)費用合理。

非阻塞 + 回調(diào) + IO 多路復(fù)用

我們知道一臺主機的資源有限,一顆 CPU、一塊磁盤、一張網(wǎng)卡,如何同時服務(wù)上百個請求呢?多進程模式是最初的解決方案。內(nèi)核把 CPU 的執(zhí)行時間切分成許多時間片(timeslice),比如 1 秒鐘可以切分為 100 個 10 毫秒的時間片,每個時間片再分發(fā)給不同的進程,通常,每個進程需要多個時間片才能完成一個請求。

這樣雖然微觀上,比如說就這 10 毫秒時間 CPU 只能執(zhí)行一個進程,但宏觀上 1 秒鐘執(zhí)行了 100 個時間片,于是每個時間片所屬進程中的請求也得到了執(zhí)行,這就實現(xiàn)了請求的并發(fā)執(zhí)行。

不過,每個進程的內(nèi)存空間都是獨立的,因此使用多進程實現(xiàn)并發(fā)就有兩個缺點:一是內(nèi)核的管理成本高,二是無法簡單地通過內(nèi)存同步數(shù)據(jù),很不方便。于是,多線程模式就出現(xiàn)了,多線程模式通過共享內(nèi)存地址空間,解決了這兩個問題。

然而,共享地址空間雖然可以方便地共享對象,但這也導(dǎo)致一個問題,那就是任何一個線程出錯時,進程中的所有線程會跟著一起崩潰。這也是如 Nginx 等強調(diào)穩(wěn)定性的服務(wù)堅持使用多進程模式的原因。

但事實上無論基于多進程還是多線程,都難以實現(xiàn)高并發(fā),主要有以下兩個原因。

  • 首先,單個線程消耗的內(nèi)存過多,比如 64 位的 Linux 為每個線程的棧分配了 8MB 的內(nèi)存,此外為了提升后續(xù)內(nèi)存分配的性能,還為每個線程預(yù)分配了 64MB 的內(nèi)存作為堆內(nèi)存池(Thread Area)。所以,我們沒有足夠的內(nèi)存去開啟幾萬個線程實現(xiàn)并發(fā)。

  • 其次,切換請求是內(nèi)核通過切換線程實現(xiàn)的,什么時候會切換線程呢?不只時間片用盡,當(dāng)調(diào)用阻塞方法時,內(nèi)核為了讓 CPU 充分工作,也會切換到其他線程執(zhí)行。而一次上下文切換的成本在幾十納秒到幾微秒之間,當(dāng)線程繁忙且數(shù)量眾多時,這些切換會消耗絕大部分的 CPU 運算能力。

下圖以磁盤 IO 為例,描述了多線程中使用阻塞方法讀磁盤,2 個線程間的切換方式。

Python協(xié)程是怎么實現(xiàn)的

通過多線程的方式,一個線程處理一個請求,從而實現(xiàn)并發(fā)。但很明顯,操作系統(tǒng)能創(chuàng)建線程數(shù)是有限的,因為線程越多資源占用就越多,而且線程之間的切換成本也比較大,因為涉及到內(nèi)核態(tài)和用戶態(tài)之間的切換。

那么問題來了,怎么才能實現(xiàn)高并發(fā)呢?答案是「把上圖中由內(nèi)核實現(xiàn)的請求切換工作,交由用戶態(tài)的代碼來完成就可以了」。異步化編程通過應(yīng)用層代碼實現(xiàn)了請求切換,降低了切換成本和內(nèi)存占用空間。

異步化依賴于 IO 多路復(fù)用機制,比如 Linux 的 epoll,同時,必須把阻塞方法更改為非阻塞方法,才能避免內(nèi)核切換帶來的巨大消耗。Nginx、redis 等高性能服務(wù)都依賴異步化實現(xiàn)了百萬量級的并發(fā)。

下圖描述了異步 IO 的非阻塞讀和異步框架結(jié)合后,是如何切換請求的。

Python協(xié)程是怎么實現(xiàn)的

注意圖中的變化,之前是一個線程處理一個請求,現(xiàn)在是一個線程處理多個請求,這就是我們之前說的「非阻塞+回調(diào)」的方式。它依賴操作系統(tǒng)提供的 IO 多路復(fù)用,比如 Linux 的 epoll,BSD 的 kqueue。

此時的讀寫操作都相當(dāng)于一個事件,并為每一個事件都注冊相應(yīng)的回調(diào)函數(shù),然后線程不會阻塞(因為讀寫操作此時是非阻塞的),而是可以做其它事情,然后由 epoll 來對這些事件進行統(tǒng)一管理。

一旦事件發(fā)生(滿足可讀、可寫時),那么 epoll 就會告知線程,然后線程執(zhí)行為該事件注冊的回調(diào)函數(shù)。

為了更好地理解,我們再以 Redis 為例,介紹一下非阻塞 IO 和 IO 多路復(fù)用。

127.0.0.1:6379> get name
"satori"

首先我們可以使用 get 命令,獲取一個 key 對應(yīng)的 value,那么問題來了,以上對于 Redis 服務(wù)端而言,都發(fā)生了哪些事情呢?

服務(wù)端必須要先監(jiān)聽客戶端請求(bind/listen),然后當(dāng)客戶端到來時與其建立連接(accept),從 socket 中讀取客戶端的請求(recv),對請求進行解析(parse),這里解析出的請求類型是 get、key 是 "name",再根據(jù) key 獲取對應(yīng)的 value,最后返回給客戶端,也就是向 socket 寫入數(shù)據(jù)(send)。

Python協(xié)程是怎么實現(xiàn)的

以上所有操作都是由 Redis 主線程依次執(zhí)行的,但是里面會有潛在的阻塞點,分別是 accept 和 recv。

如果是阻塞 IO,當(dāng) Redis 監(jiān)聽到一個客戶端有連接請求、但卻一直未能成功建立連接,那么主線程會一直阻塞在 accept 函數(shù)這里,導(dǎo)致其它客戶端無法和 Redis 建立連接。類似的,當(dāng) Redis 通過 recv 從客戶端讀取數(shù)據(jù)時,如果數(shù)據(jù)一直沒有到達,那么 Redis 主線程也會一直阻塞在 recv 這一步,因此這就導(dǎo)致了 Redis 的效率會變得低下。

非阻塞 IO

但很明顯,Redis 不會允許這種情況發(fā)生,因為以上都是阻塞 IO 會面臨的情況,而 Redis 采用的是非阻塞 IO,也就是將 socket 設(shè)置成了非阻塞模式。首先在 socket 模型中,調(diào)用 socket() 方法會返回主動套接字;調(diào)用 bind() 方法綁定 IP 和 端口,再調(diào)用 listen() 方法將主動套接字轉(zhuǎn)化為監(jiān)聽套接字;最后監(jiān)聽套接字調(diào)用 accept() 方法等待客戶端連接的到來,當(dāng)和客戶端建立連接時再返回已連接套接字,而后續(xù)就通過已連接套接字來和客戶端進行數(shù)據(jù)的接收與發(fā)送。

但是注意:我們說在 listen() 這一步,會將主動套接字轉(zhuǎn)化為監(jiān)聽套接字,而此時的監(jiān)聽套接字的類型是阻塞的,阻塞類型的監(jiān)聽套接字在調(diào)用 accept() 方法時,如果沒有客戶端來連接的話,就會一直處于阻塞狀態(tài),那么此時主線程就沒法干其它事情了。所以在 listen() 的時候可以將其設(shè)置為非阻塞,而非阻塞的監(jiān)聽套接字在調(diào)用 accept() 時,如果沒有客戶端連接請求到達時,那么主線程就不會傻傻地等待了,而是會直接返回,然后去做其它的事情。

類似的,我們在創(chuàng)建已連接套接字的時候也可以將其類型設(shè)置為非阻塞,因為阻塞類型的已連接套接字在調(diào)用 send() / recv() 的時候也會處于阻塞狀態(tài),比如當(dāng)客戶端一直不發(fā)數(shù)據(jù)的時候,已連接套接字就會一直阻塞在 rev() 這一步。如果是非阻塞類型的已連接套接字,那么當(dāng)調(diào)用 recv() 但卻收不到數(shù)據(jù)時,也不用處于阻塞狀態(tài),同樣可以直接返回去做其它事情。

Python協(xié)程是怎么實現(xiàn)的

但是有兩點需要注意:

1)雖然 accept() 不阻塞了,在沒有客戶端連接時 Redis 主線程可以去做其它事情,但如果后續(xù)有客戶端來連接,Redis 要如何得知呢?因此必須要有一種機制,能夠繼續(xù)在監(jiān)聽套接字上等待后續(xù)連接請求,并在請求到來時通知 Redis。

2)send() / recv() 不阻塞了,相當(dāng)于 IO 的讀寫流程不再是阻塞的,讀寫方法都會瞬間完成并且返回,也就是它會采用能讀多少就讀多少、能寫多少就寫多少的策略來執(zhí)行 IO 操作,這顯然更符合我們對性能的追求。但這樣同樣會面臨一個問題,就是當(dāng)我們執(zhí)行讀取操作時,有可能只讀取了一部分數(shù)據(jù),剩余的數(shù)據(jù)客戶端還沒發(fā)過來,那么這些這些數(shù)據(jù)何時可讀呢?同理寫數(shù)據(jù)也是這種情況,當(dāng)緩沖區(qū)滿了,而我們的數(shù)據(jù)還沒有寫完,那么剩下的數(shù)據(jù)又何時可寫呢?因此同樣要有一種機制,能夠在 Redis 主線程做別的事情的時候繼續(xù)監(jiān)聽已連接套接字,并且有數(shù)據(jù)可讀寫的時候通知 Redis。

這樣才能保證 Redis 線程既不會像基本 IO 模型中一直在阻塞點等待,也不會無法處理實際到達的客戶端連接請求和可讀寫的數(shù)據(jù),而上面所提到的機制便是 IO 多路復(fù)用。

IO 多路復(fù)用

I/O 多路復(fù)用機制是指一個線程處理多個 IO 流,也就是我們經(jīng)常聽到的 select/poll/epoll。關(guān)于這三者的區(qū)別我們就不說了,它們所做的事情都一樣,無非是性能和實現(xiàn)原理上有差異。select 是所有系統(tǒng)都支持,而 epoll 只有 Linux 支持。

簡單來說,在 Redis 只運行單線程的情況下,該機制允許內(nèi)核中同時存在多個監(jiān)聽套接字和已連接套接字。內(nèi)核會一直監(jiān)聽這些套接字上的連接請求或數(shù)據(jù)請求,一旦有請求到達就會交給 Redis 線程處理,這樣就實現(xiàn)了一個 Redis 線程處理多個 IO 流的效果。

Python協(xié)程是怎么實現(xiàn)的

上圖就是基于多路復(fù)用的 Redis IO 模型,圖中的 FD 就是套接字,可以是監(jiān)聽套接字,也可以是已連接套接字,Redis 會通過 epoll 機制來讓內(nèi)核幫忙監(jiān)聽這些套接字。而此時 Redis 線程或者說主線程,不會阻塞在某一個特定的套接字上,也就是說不會阻塞在某一個特定的客戶端請求處理上。因此 Redis 可以同時和多個客戶端連接并處理請求,從而提升并發(fā)性。

但為了在請求到達時能夠通知 Redis 線程,epoll 提供了基于事件的回調(diào)機制,即針對不同事件的發(fā)生,調(diào)用相應(yīng)的處理函數(shù)。

那么回調(diào)機制是怎么工作的呢?以上圖為例,首先 epoll 一旦監(jiān)測到 FD 上有請求到達,就會觸發(fā)相應(yīng)的事件。這些事件會被放進一個隊列中,Redis 主線程會對該事件隊列不斷進行處理,這樣一來 Redis 就無需一直輪詢是否有請求發(fā)生,從而避免資源的浪費。

同時,Redis 在對事件隊列中的事件進行處理時,會調(diào)用相應(yīng)的處理函數(shù),這就實現(xiàn)了基于事件的回調(diào)。因為 Redis 一直在對事件隊列進行處理,所以能及時響應(yīng)客戶端請求,提升 Redis 的響應(yīng)性能。

我們以實際的連接請求和數(shù)據(jù)讀取請求為例,再解釋一下。連接請求和數(shù)據(jù)讀取請求分別對應(yīng) Accept 事件和 Read 事件,Redis 分別對這兩個事件注冊 accept 和 get 回調(diào)函數(shù),當(dāng) Linux 內(nèi)核監(jiān)聽到有連接請求或數(shù)據(jù)讀取請求時,就會觸發(fā) Accept 事件或 Read 事件,然后通知主線程,回調(diào)注冊的 accept 函數(shù)或 get 函數(shù)。

就像病人去醫(yī)院看病,在醫(yī)生實際診斷之前每個病人(類似于請求)都需要先分診、測體溫、登記等等。如果這些工作都由醫(yī)生完成,那么醫(yī)生的工作效率就會很低。所以醫(yī)院設(shè)置了分診臺,分診臺會一直處理這些診斷前的工作(類似于 Linux 內(nèi)核監(jiān)聽請求),然后再轉(zhuǎn)交給醫(yī)生做實際診斷,這樣即使一個醫(yī)生(相當(dāng)于 Redis 的主線程)也能有很高的效率。

這里需要再補充一下:我們上面提到的異步 IO 不是真正意義上的異步 IO,而是基于 IO 多路復(fù)用實現(xiàn)的異步化。但 IO 多路復(fù)用本質(zhì)上是同步 IO,只是它可以同時監(jiān)聽多個文件描述符,一旦某個描述符的讀寫操作就緒,就能夠通知應(yīng)用程序進行相應(yīng)的讀寫操作。至于真正意義的異步 IO,操作系統(tǒng)也是支持的,但支持的不太理想,所以現(xiàn)在使用的都是 IO 多用復(fù)用,并代指異步 IO。

為什么不推薦這種編程模式?

必須要承認的是,編寫這種異步化代碼能夠帶來很高的性能收益,Redis、Nginx 已經(jīng)證明了這一點。

但是這種編程模式,在實際工作中很容易出錯,因為所有阻塞函數(shù),都需要通過非阻塞的系統(tǒng)調(diào)用加上回調(diào)注冊的方式拆分成兩個函數(shù)。說白了就是我們的邏輯不能夠直接執(zhí)行,必須把它們放在一個單獨的函數(shù)里面,然后這個函數(shù)以回調(diào)的方式注冊給 IO 多路復(fù)用。

這種編程模式違反了軟件工程的內(nèi)聚性原則,函數(shù)之間同步數(shù)據(jù)也更復(fù)雜。特別是條件分支眾多、涉及大量系統(tǒng)調(diào)用時,異步化的改造工作會非常困難,盡管它的性能很高。

下面我們用 Python 編寫一段代碼,實際體驗一下這種編程模式,看看它復(fù)雜在哪里。

from urllib.parse import urlparse
import socket
from io import BytesIO
# selectors 里面提供了多種"多路復(fù)用器"
# 除了 select、poll、epoll 之外
# 還有 kqueue,這個是針對 BSD 平臺的
try:
 from selectors import (
 SelectSelector,
 PollSelector,
 EpollSelector,
 KqueueSelector
 )
except ImportError:
 pass
# 由于種類比較多,所以提供了DefaultSelector
# 會根據(jù)當(dāng)前的系統(tǒng)種類,自動選擇一個合適的多路復(fù)用器
from selectors import (
 DefaultSelector,
 EVENT_READ,# 讀事件
 EVENT_WRITE,# 寫事件
)
class RequestHandler:
 """
 向指定的 url 發(fā)請求
 獲取返回的內(nèi)容
 """
 selector = DefaultSelector()
 tasks = {"unfinished": 0}
 def __init__(self, url):
 """
 :param url: http://localhost:9999/v1/index
 """
 self.tasks["unfinished"] += 1
 url = urlparse(url)
 # 根據(jù) url 解析出 域名、端口、查詢路徑
 self.netloc = url.netloc# 域名:端口
 self.path = url.path or "/"# 查詢路徑
 # 創(chuàng)建 socket
 self.client = socket.socket()
 # 設(shè)置成非阻塞
 self.client.setblocking(False)
 # 用于接收數(shù)據(jù)的緩存
 self.buffer = BytesIO()
 def get_result(self):
 """
 發(fā)送請求,進行下載
 :return:
 """
 # 連接到指定的服務(wù)器
 # 如果沒有 : 說明只有域名沒有端口
 # 那么默認訪問 80 端口
 if ":" not in self.netloc:
 host, port = self.netloc, 80
 else:
 host, port = self.netloc.split(":")
 # 由于 socket 非阻塞,所以連接可能尚未建立好
 try:
 self.client.connect((host, int(port)))
 except BlockingIOError:
 pass
 # 我們上面是建立連接,連接建立好就該發(fā)請求了
 # 但是連接什么時候建立好我們并不知道,只能交給操作系統(tǒng)
 # 所以我們需要通過 register 給 socket 注冊一個回調(diào)函數(shù)
 # 參數(shù)一:socket 的文件描述符
 # 參數(shù)二:事件
 # 參數(shù)三:當(dāng)事件發(fā)生時執(zhí)行的回調(diào)函數(shù)
 self.selector.register(self.client.fileno(),
EVENT_WRITE,
self.send)
 # 表示當(dāng) self.client 這個 socket 滿足可寫時
 # 就去執(zhí)行 self.send
 # 翻譯過來就是連接建立好了,就去發(fā)請求
 # 可以看到,一個阻塞調(diào)用,我們必須拆成兩個函數(shù)去寫
 def send(self, key):
 """
 連接建立好之后,執(zhí)行的回調(diào)函數(shù)
 回調(diào)需要接收一個參數(shù),這是一個 namedtuple
 內(nèi)部有如下字段:'fileobj', 'fd', 'events', 'data'
 key.fd 就是 socket 的文件描述符
 key.data 就是給 socket 綁定的回調(diào)
 :param key:
 :return:
 """
 payload = (f"GET {self.path} HTTP/1.1rn"
f"Host: {self.netloc}rn"
"Connection: closernrn")
 # 執(zhí)行此函數(shù),說明事件已經(jīng)觸發(fā)
 # 我們要將綁定的回調(diào)函數(shù)取消
 self.selector.unregister(key.fd)
 # 發(fā)送請求
 self.client.send(payload.encode("utf-8"))
 # 請求發(fā)送之后就要接收了,但是啥時候能接收呢?
 # 還是要交給操作系統(tǒng),所以仍然需要注冊回調(diào)
 self.selector.register(self.client.fileno(),
EVENT_READ,
self.recv)
 # 表示當(dāng) self.client 這個 socket 滿足可讀時
 # 就去執(zhí)行 self.recv
 # 翻譯過來就是數(shù)據(jù)返回了,就去接收數(shù)據(jù)
 def recv(self, key):
 """
 數(shù)據(jù)返回時執(zhí)行的回調(diào)函數(shù)
 :param key:
 :return:
 """
 # 接收數(shù)據(jù),但是只收了 1024 個字節(jié)
 # 如果實際返回的數(shù)據(jù)超過了 1024 個字節(jié)怎么辦?
 data = self.client.recv(1024)
 # 很簡單,只要數(shù)據(jù)沒收完,那么數(shù)據(jù)到來時就會可讀
 # 那么會再次調(diào)用此函數(shù),直到數(shù)據(jù)接收完為止
 # 注意:此時是非阻塞的,數(shù)據(jù)有多少就收多少
 # 沒有接收的數(shù)據(jù),會等到下一次再接收
 # 所以這里不能寫 while True
 if data:
 # 如果有數(shù)據(jù),那么寫入到 buffer 中
 self.buffer.write(data)
 else:
 # 否則說明數(shù)據(jù)讀完了,那么將注冊的回調(diào)取消
 self.selector.unregister(key.fd)
 # 此時就拿到了所有的數(shù)據(jù)
 all_data = self.buffer.getvalue()
 # 按照 rnrn 進行分隔得到列表
 # 第一個元素是響應(yīng)頭,第二個元素是響應(yīng)體
 result = all_data.split(b"rnrn")[1]
 print(f"result: {result.decode('utf-8')}")
 self.client.close()
 self.tasks["unfinished"] -= 1
 @classmethod
 def run_until_complete(cls):
 # 基于 IO 多路復(fù)用創(chuàng)建事件循環(huán)
 # 驅(qū)動內(nèi)核不斷輪詢 socket,檢測事件是否發(fā)生
 # 當(dāng)事件發(fā)生時,調(diào)用相應(yīng)的回調(diào)函數(shù)
 while cls.tasks["unfinished"]:
 # 輪詢,返回事件已經(jīng)就緒的 socket
 ready = cls.selector.select()
 # 這個 key 就是回調(diào)里面的 key
 for key, mask in ready:
 # 拿到回調(diào)函數(shù)并調(diào)用,這一步需要我們手動完成
 callback = key.data
 callback(key)
 # 因此當(dāng)事件發(fā)生時,調(diào)用綁定的回調(diào),就是這么實現(xiàn)的
 # 整個過程就是給 socket 綁定一個事件 + 回調(diào)
 # 事件循環(huán)不停地輪詢檢測,一旦事件發(fā)生就會告知我們
 # 但是調(diào)用回調(diào)不是內(nèi)核自動完成的,而是由我們手動完成的
 # "非阻塞 + 回調(diào) + 基于 IO 多路復(fù)用的事件循環(huán)"
 # 所有框架基本都是這個套路

一個簡單的 url 獲取,居然要寫這么多代碼,而它的好處就是性能高,因為不用把時間浪費在建立連接、等待數(shù)據(jù)上面。只要有事件發(fā)生,就會執(zhí)行相應(yīng)的回調(diào),極大地提高了 CPU 利用率。而且這是單線程,也沒有線程切換帶來的開銷。

那么下面測試一下吧。

import time
start = time.perf_counter()
for _ in range(10):
 # 這里面只是注冊了回調(diào),但還沒有真正執(zhí)行
 RequestHandler(url="https://localhost:9999/index").get_result()
# 創(chuàng)建事件循環(huán),驅(qū)動執(zhí)行
RequestHandler.run_until_complete()
end = time.perf_counter()
print(f"總耗時: {end - start}")

我用 FastAPI 編寫了一個服務(wù),為了更好地看到現(xiàn)象,服務(wù)里面刻意 sleep 了 1 秒。然后發(fā)送十次請求,看看效果如何。

Python協(xié)程是怎么實現(xiàn)的

總共耗時 1 秒鐘,我們再采用同步的方式進行編寫,看看效果如何。

Python協(xié)程是怎么實現(xiàn)的

可以看到回調(diào)的這種寫法性能非常高,但是它和我們傳統(tǒng)的同步代碼的寫法大相徑庭。如果是同步代碼,那么會先建立連接、然后發(fā)送數(shù)據(jù)、再接收數(shù)據(jù),這顯然更符合我們?nèi)祟惖乃季S,邏輯自上而下,非常自然。

但是回調(diào)的方式,就讓人很不適應(yīng),我們在建立完連接之后,不能直接發(fā)送數(shù)據(jù),必須將發(fā)送數(shù)據(jù)的邏輯放在一個單獨的函數(shù)(方法)中,然后再將這個函數(shù)以回調(diào)的方式注冊進去。

同理,在發(fā)送完數(shù)據(jù)之后,也不能立刻接收。同樣要將接收數(shù)據(jù)的邏輯放在一個單獨的函數(shù)中,然后再以回調(diào)的方式注冊進去。

所以好端端的自上而下的邏輯,因為回調(diào)而被分割的四分五裂,這種代碼在編寫和維護的時候是非常痛苦的。

比如回調(diào)可能會層層嵌套,容易陷入回調(diào)地獄,如果某一個回調(diào)執(zhí)行出錯了怎么辦?代碼的可讀性差導(dǎo)致不好排查,即便排查到了也難處理。

另外,如果多個回調(diào)需要共享一個變量該怎么辦?因為回調(diào)是通過事件循環(huán)調(diào)用的,在注冊回調(diào)的時候很難把變量傳過去。簡單的做法是把該變量設(shè)置為全局變量,或者說多個回調(diào)都是某個類的成員函數(shù),然后把共享的變量作為一個屬性綁定在 self 上面。但當(dāng)邏輯復(fù)雜時,就很容易導(dǎo)致全局變量滿天飛的問題。

所以這種模式就使得開發(fā)人員在編寫業(yè)務(wù)邏輯的同時,還要關(guān)注并發(fā)細節(jié)。

因此使用回調(diào)的方式編寫異步化代碼,雖然并發(fā)量能上去,但是對開發(fā)者很不友好;而使用同步的方式編寫同步代碼,雖然很容易理解,可并發(fā)量卻又上不去。那么問題來了,有沒有一種辦法,能夠讓我們在享受異步化帶來的高并發(fā)的同時,又能以同步的方式去編寫代碼呢?也就是我們能不能以同步的方式去編寫異步化的代碼呢?

答案是可以的,使用「協(xié)程」便可以辦到。協(xié)程在異步化之上包了一層外衣,兼顧了開發(fā)效率與運行效率。

協(xié)程是如何實現(xiàn)高并發(fā)的?

協(xié)程與異步編程相似的地方在于,它們必須使用非阻塞的系統(tǒng)調(diào)用與內(nèi)核交互,把切換請求的權(quán)力牢牢掌握在用戶態(tài)的代碼中。但不同的地方在于,協(xié)程把異步化中的兩段函數(shù),封裝為一個阻塞的協(xié)程函數(shù)。

這個函數(shù)執(zhí)行時,會使調(diào)用它的協(xié)程無感知地放棄執(zhí)行權(quán),由協(xié)程框架切換到其他就緒的協(xié)程繼續(xù)執(zhí)行。當(dāng)這個函數(shù)的結(jié)果滿足后,協(xié)程框架再選擇合適的時機,切換回它所在的協(xié)程繼續(xù)執(zhí)行。我們還是以讀取磁盤文件為例,看一張協(xié)程的示意圖:

Python協(xié)程是怎么實現(xiàn)的

看起來非常棒,所以異步化是通過回調(diào)函數(shù)來完成請求切換的,業(yè)務(wù)邏輯與并發(fā)實現(xiàn)關(guān)聯(lián)在一起,很容易出錯。而協(xié)程不需要什么「回調(diào)函數(shù)」,它允許用戶調(diào)用「阻塞的」協(xié)程方法,用同步編程方式寫業(yè)務(wù)邏輯。

再回到之前的那個 socket 發(fā)請求的例子,我們用協(xié)程的方式重寫一遍,看看它和基于回調(diào)的異步化編程有什么區(qū)別?

import time
from urllib.parse import urlparse
import asyncio
async def download(url):
 url = urlparse(url)
 # 域名:端口
 netloc = url.netloc
 if ":" not in netloc:
 host, port = netloc, 80
 else:
 host, port = netloc.split(":")
 path = url.path or "/"
 # 創(chuàng)建連接
 reader, writer = await asyncio.open_connection(host, port)
 # 發(fā)送數(shù)據(jù)
 payload = (f"GET {path} HTTP/1.1rn"
f"Host: {netloc}rn"
"Connection: closernrn")
 writer.write(payload.encode("utf-8"))
 await writer.drain()
 # 接收數(shù)據(jù)
 result = (await reader.read()).split(b"rnrn")[1]
 writer.close()
 print(f"result: {result.decode('utf-8')}")
# 以上就是發(fā)送請求相關(guān)的邏輯
# 我們看到代碼是自上而下的,沒有涉及到任何的回調(diào)
# 完全就像寫同步代碼一樣
async def main():
 # 發(fā)送 10 個請求
 await asyncio.gather(
 *[download("http://localhost:9999/index")
 for _ in range(10)]
 )
start = time.perf_counter()
# 同樣需要創(chuàng)建基于 IO 多路復(fù)用的事件循環(huán)
# 協(xié)程會被丟進事件循環(huán)中,依靠事件循環(huán)驅(qū)動執(zhí)行
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end = time.perf_counter()
print(f"總耗時: {end - start}")

代碼邏輯很好理解,和我們平時編寫的同步代碼沒有太大的區(qū)別,那么它的效率如何呢?

Python協(xié)程是怎么實現(xiàn)的

我們看到用了 3 秒鐘,比同步的方式快,但是比異步化的方式要慢。因為一開始就說過,協(xié)程并不比異步化的方式快,但我們之所以選擇它,是因為它的編程模型更簡單,能夠讓我們以同步的方式編寫異步的代碼。如果是基于回調(diào)方式的異步化,雖然性能很高(比如 Redis、Nginx),但對開發(fā)者是一個挑戰(zhàn)。

回到上面那個協(xié)程的例子中,我們一共發(fā)了 10 個請求,并在可能阻塞的地方加上了 await。意思就是,在執(zhí)行某個協(xié)程 await 后面的代碼時如果阻塞了,那么該協(xié)程會主動將執(zhí)行權(quán)交給事件循環(huán),然后事件循環(huán)再選擇其它的協(xié)程執(zhí)行。并且協(xié)程本質(zhì)上也是個單線程,雖然協(xié)程可以有多個,但是背后的線程只有一個。

協(xié)程是如何切換的?

那么問題來了,協(xié)程的切換是如何完成的呢?

實際上,用戶態(tài)的代碼切換協(xié)程,與內(nèi)核切換線程的原理是一樣的。內(nèi)核通過管理 CPU 的寄存器來切換線程,我們以最重要的棧寄存器和指令寄存器為例,看看協(xié)程切換時如何切換程序指令與內(nèi)存。

每個線程有獨立的棧,而棧既保留了變量的值,也保留了函數(shù)的調(diào)用關(guān)系、參數(shù)和返回值,CPU 中的棧寄存器 SP 指向了當(dāng)前線程的棧,而指令寄存器 IP 保存著下一條要執(zhí)行的指令地址。

因此,從線程 1 切換到線程 2 時,首先要把 SP、IP 寄存器的值為線程 1 保存下來,再從內(nèi)存中找出線程 2 上一次切換前保存好的寄存器的值,并寫入 CPU 的寄存器,這樣就完成了線程切換(其他寄存器也需要管理、替換,原理與此相同,不再贅述)。

協(xié)程的切換與此相同,只是把內(nèi)核的工作轉(zhuǎn)移到協(xié)程框架來實現(xiàn)而已,下圖是協(xié)程切換前的狀態(tài):

Python協(xié)程是怎么實現(xiàn)的

當(dāng)遇到阻塞時會進行協(xié)程切換,從協(xié)程 1 切換到協(xié)程 2 后的狀態(tài)如下圖所示:

Python協(xié)程是怎么實現(xiàn)的

創(chuàng)建協(xié)程時,會從進程的堆中分配一段內(nèi)存作為協(xié)程的棧。線程的棧有 8MB,而協(xié)程棧的大小通常只有幾十 KB。而且,C 庫內(nèi)存池也不會為協(xié)程預(yù)分配內(nèi)存,它感知不到協(xié)程的存在。這樣,更低的內(nèi)存占用空間為高并發(fā)提供了保證,畢竟十萬并發(fā)請求,就意味著 10 萬個協(xié)程。

另外??s小后,就盡量不要使用遞歸函數(shù),也不能在棧中申請過多的內(nèi)存,這是實現(xiàn)高并發(fā)必須付出的代價。當(dāng)然啦,如果能像 Go 一樣,協(xié)程??梢宰杂缮炜s的話,就不用擔(dān)心了。

由此可見,協(xié)程就是用戶態(tài)的線程。然而,為了保證所有切換都在用戶態(tài)進行,協(xié)程必須重新封裝所有的阻塞系統(tǒng)調(diào)用,否則一旦協(xié)程觸發(fā)了線程切換,會導(dǎo)致這個線程進入休眠狀態(tài),進而其上的所有協(xié)程都得不到執(zhí)行。

比如普通的 sleep 函數(shù)會讓當(dāng)前線程休眠,由內(nèi)核來喚醒線程,而協(xié)程化改造后,sleep 只會讓當(dāng)前協(xié)程休眠,由協(xié)程框架在指定時間后喚醒協(xié)程,所以在 Python 的協(xié)程里面我們不能寫 time.sleep,而是應(yīng)該寫 asyncio.sleep。再比如,線程間的互斥鎖是使用信號量實現(xiàn)的,而信號量也會導(dǎo)致線程休眠,協(xié)程化改造互斥鎖后,同樣由框架來協(xié)調(diào)、同步各協(xié)程的執(zhí)行。

所以協(xié)程的高性能,建立在切換必須由用戶態(tài)代碼完成之上,這要求協(xié)程生態(tài)是完整的,要盡量覆蓋常見的組件。

還是以 Python 為例,我經(jīng)常看見有人在 async def 里面寫 requests.get 發(fā)請求,這是不對的。requests.get 底層調(diào)用的是同步阻塞的 socket,這會使得線程阻塞,而線程一旦阻塞,就會導(dǎo)致所有的協(xié)程阻塞,此時就等價于串行。所以把它放在 async def 里面沒有任何意義,正確的做法是使用 aiohttp 或 httpx。因此如果想使用協(xié)程,那么需要重新封裝底層的系統(tǒng)調(diào)用,如果實在沒辦法就扔到線程池中運行。

再比如 MySQL 官方提供的客戶端 SDK,它使用了阻塞 socket 做網(wǎng)絡(luò)訪問,會導(dǎo)致線程休眠,必須用非阻塞 socket 把 SDK 改造為協(xié)程函數(shù)后,才能在協(xié)程中使用。

當(dāng)然,并不是所有的函數(shù)都能用協(xié)程改造,比如磁盤的異步 IO 讀。它雖然是非阻塞的,但無法使用 PageCache,反而降低了系統(tǒng)吞吐量。如果使用緩存 IO 讀文件,在沒有命中 PageCache 時是可能發(fā)生阻塞的。這種時候,如果對性能有更高的要求,就需要把線程與協(xié)程結(jié)合起來用,把可能阻塞的操作扔到線程池中執(zhí)行,通過生產(chǎn)者 / 消費者模型與協(xié)程配合工作。

實際上,面對多核系統(tǒng),也需要協(xié)程與線程配合工作。因為協(xié)程的載體是線程,而一個線程同一時間只能使用一顆 CPU,所以通過開啟更多的線程,將所有協(xié)程分布在這些線程中,就能充分使用 CPU 資源。有過 Go 語言使用經(jīng)驗的話,應(yīng)該很清楚這一點。

除此之外,為了讓協(xié)程獲得更多的 CPU 時間,還可以設(shè)置所在線程的優(yōu)先級,比如在 Linux 中把線程的優(yōu)先級設(shè)置到 -20,就可以每次獲得更長的時間片。另外 CPU 緩存對程序性能也是有影響的,為了減少 CPU 緩存失效的比例,還可以把線程綁定到某個 CPU 上,增加協(xié)程執(zhí)行時命中 CPU 緩存的機率。

雖然這里一直說協(xié)程框架在調(diào)度協(xié)程,然而你會發(fā)現(xiàn),很多協(xié)程庫只提供了創(chuàng)建、掛起、恢復(fù)執(zhí)行等基本方法,并沒有協(xié)程框架的存在,需要業(yè)務(wù)代碼自行調(diào)度協(xié)程。這是因為,這些通用的協(xié)程庫(比如 asyncio)并不是專為服務(wù)器設(shè)計的,服務(wù)器中可以由客戶端網(wǎng)絡(luò)連接的建立,驅(qū)動著創(chuàng)建出協(xié)程,同時伴隨著請求的結(jié)束而終止。

而在協(xié)程的運行條件不滿足時,多路復(fù)用框架會將它掛起,并根據(jù)優(yōu)先級策略選擇另一個協(xié)程執(zhí)行。因此,使用協(xié)程實現(xiàn)服務(wù)器端的高并發(fā)服務(wù)時,并不只是選擇協(xié)程庫,還要從其生態(tài)中找到結(jié)合 IO 多路復(fù)用的協(xié)程框架(比如 Tornado),這樣可以加快開發(fā)速度。

一句話總結(jié)協(xié)程

從廣義上講,協(xié)程是一種輕量級的并發(fā)模型,說的比較高大上。但從狹義上講,協(xié)程就是調(diào)用一個可以暫停并切換的函數(shù)。像我們使用 async def 定義的就是一個協(xié)程函數(shù),本質(zhì)上也是個函數(shù),而調(diào)用協(xié)程函數(shù)就會得到一個協(xié)程。

將協(xié)程丟進事件循環(huán),由事件循環(huán)驅(qū)動執(zhí)行,一旦發(fā)生阻塞,便將執(zhí)行權(quán)主動交給事件循環(huán),事件循環(huán)再驅(qū)動其它協(xié)程執(zhí)行。所以自始至終都只有一個線程,而協(xié)程只不過是我們參考線程的結(jié)構(gòu),在用戶態(tài)模擬出來的。

所以調(diào)用一個普通函數(shù),會一直將內(nèi)部的代碼邏輯全部執(zhí)行完;而調(diào)用一個協(xié)程函數(shù),在內(nèi)部出現(xiàn)了阻塞,那么會切換到其它的協(xié)程。

但是協(xié)程出現(xiàn)阻塞能夠切換有一個重要的前提,就是這個阻塞不能涉及任何的系統(tǒng)調(diào)用,比如 time.sleep、同步的 socket 等等。這些都需要內(nèi)核參與,而內(nèi)核一旦參與了,那么造成的阻塞就不單單是阻塞某個協(xié)程那么簡單了(OS 也感知不到協(xié)程),而是會使線程阻塞。線程一旦阻塞,在其之上的所有協(xié)程都會阻塞,由于協(xié)程是以線程作為載體的,實際執(zhí)行的肯定是線程,如果每個協(xié)程都會使得線程阻塞,那么此時不就相當(dāng)于串行了嗎?

所以想使用協(xié)程,必須將阻塞的系統(tǒng)調(diào)用重新封裝,我們舉個栗子:

@app.get(r"/index1")
async def index1():
 time.sleep(30)
 return "index1"
@app.get(r"/index2")
async def index2():
 return "index2"

這是一個基于 FastAPI 編寫的服務(wù),我們只看視圖函數(shù)。如果我們先訪問 /index1,然后訪問 /index2,那么必須等到 30 秒之后,/index2 才會響應(yīng)。因為這是一個單線程,/index1 里面的 time.sleep 會觸發(fā)系統(tǒng)調(diào)用,使得整個線程都進入阻塞,線程一旦阻塞了,所有的協(xié)程就都別想執(zhí)行了。

如果將上面的例子改一下:

@app.get(r"/index1")
async def index():
 await asyncio.sleep(30)
 return "index1"
@app.get(r"/index2")
async def index():
 return "index2"

訪問 /index1 依舊會進行 30 秒的休眠,但此時再訪問 /index2 的話則是立刻返回。原因是 asyncio.sleep(30) 重新封裝了阻塞的系統(tǒng)調(diào)用,此時的休眠是在用戶態(tài)完成的,沒有經(jīng)過內(nèi)核。換句話說,此時只會導(dǎo)致協(xié)程休眠,不會導(dǎo)致線程休眠,那么當(dāng)訪問 /index2 的時候,對應(yīng)的協(xié)程會立刻執(zhí)行,然后返回結(jié)果。

同理我們在發(fā)網(wǎng)絡(luò)請求的時候,也不能使用 requests.get,因為它會導(dǎo)致線程阻塞。當(dāng)然,還有一些數(shù)據(jù)庫的驅(qū)動,例如 pymysql, psycopg2 等等,這些阻塞的都是線程。為此,在開發(fā)協(xié)程項目時,我們應(yīng)該使用 aiohttp, asyncmy, asyncpg 等等。

為什么早期 Python 的協(xié)程都沒有人用,原因就是協(xié)程想要運行,必須基于協(xié)程庫 asyncio,但問題是 asyncio 只支持發(fā)送 TCP 請求(對于協(xié)程庫而言足夠了)。如果你想通過網(wǎng)絡(luò)連接到某個組件(比如數(shù)據(jù)庫、Redis),只能手動發(fā) TCP 請求,而且這些組件對發(fā)送的數(shù)據(jù)還有格式要求,返回的數(shù)據(jù)也要手動解析,可以想象這是多么麻煩的事情。

如果想解決這一點,那么必須基于 asyncio 重新封裝一個 SDK。所以同步 SDK 和協(xié)程 SDK 最大的區(qū)別就是,一個是基于同步阻塞的 socket,一個是基于 asyncio。比如 redis 和 aioredis,連接的都是 Redis,只是在 TCP 層面發(fā)送數(shù)據(jù)的方式不同,至于其它方面則是類似的。

而早期,還沒有出現(xiàn)這些協(xié)程 SDK,自己封裝的話又是一個龐大的工程,所以 Python 的協(xié)程用起來就很艱難,因為達不到期望的效果。不像 Go 在語言層面上就支持協(xié)程,一個 go 關(guān)鍵字就搞定了。而且 Python 里面一處異步、處處異步,如果某處的阻塞切換不了,那么協(xié)程也就沒有意義了。

但現(xiàn)在 Python 已經(jīng)進化到 3.10 了,協(xié)程相關(guān)的生態(tài)也越來越完善,感謝這些開源的作者們。發(fā)送網(wǎng)絡(luò)請求、連接數(shù)據(jù)庫、編寫 web 服務(wù)等等,都有協(xié)程化的 SDK 和框架,現(xiàn)在完全可以開發(fā)以協(xié)程為主導(dǎo)的項目了。

以上就是關(guān)于“Python協(xié)程是怎么實現(xiàn)的”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對大家有幫助,若想了解更多相關(guān)的知識內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

本文題目:Python協(xié)程是怎么實現(xiàn)的
轉(zhuǎn)載注明:http://muchs.cn/article34/ijdcpe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站制作、標(biāo)簽優(yōu)化、網(wǎng)站內(nèi)鏈、品牌網(wǎng)站設(shè)計、網(wǎng)站維護自適應(yīng)網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

營銷型網(wǎng)站建設(shè)