python io多路复用_Python之IO多路复用
一、IO模型介紹
? 同步(synchronous) IO和異步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分別是什么,到底有什么區別?這個問題其實不同的人給出的答案都可能不同,比如wiki,就認為asynchronous IO和non-blocking IO是一個東西。這其實是因為不同的人的知識背景不同,并且在討論這個問題的時候上下文(context)也不相同。所以,為了更好的回答這個問題,我先限定一下本文的上下文。
本文討論的背景是Linux環境下的network IO。本文最重要的參考文獻是Richard Stevens的“UNIX? Network Programming Volume 1, Third Edition: The Sockets Networking ”,6.2節“I/O Models ”,Stevens在這節中詳細說明了各種IO的特點和區別,如果英文夠好的話,推薦直接閱讀。Stevens的文風是有名的深入淺出,所以不用擔心看不懂。本文中的流程圖也是截取自參考文獻。
Stevens在文章中一共比較了五種IO Model:
* blocking IO 阻塞IO
* nonblocking IO 非阻塞IO
* IO multiplexing IO多路復用
* signal driven IO 信號驅動IO(不常見,不講)
* asynchronous IO 異步IO
由signal driven IO(信號驅動IO)在實際中并不常用,所以主要介紹其余四種IO Model。
? 再說一下IO發生時涉及的對象和步驟。對于一個network IO (這里我們以read、recv舉例),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另一個就是系統內核(kernel)。當一個read/recv讀數據的操作發生時,該操作會經歷兩個階段:
1)等待數據準備 (Waiting for the data to be ready)
2)將數據從內核拷貝到進程中(Copying the data from the kernel to the process)
記住這兩點很重要,因為這些IO模型的區別就是在兩個階段上各有不同的情況。
補充:
#1、輸入操作:read、readv、recv、recvfrom、recvmsg共5個函數,如果會阻塞狀態,則會經理wait data和copy data兩個階段,如果設置為非阻塞則在wait 不到data時拋出異常
#2、輸出操作:write、writev、send、sendto、sendmsg共5個函數,在發送緩沖區滿了會阻塞在原地,如果設置為非阻塞,則會拋出異常
#3、接收外來鏈接:accept,與輸入操作類似
#4、發起外出鏈接:connect,與輸出操作類似
二、阻塞IO(Blocking IO)
? 在linux中,默認情況下所有的socket都是blocking,一個典型的讀操作流程大概是這樣:(recvfrom和tcp里面的recv在這些IO模型里面是一樣的)。
上面的圖形分析:兩個階段的阻塞
? 當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據。對于network io來說,很多時候數據在一開始還沒有到達(比如,還沒有收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。
而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,然后kernel返回結果,用戶進程才解除block的狀態,重新運行起來。
? 所以,blocking IO的特點就是在IO執行的兩個階段(等待數據和拷貝數據兩個階段)都被block了。
這里我們回顧一下同步/異步/阻塞/非阻塞:
同步:提交一個任務之后要等待這個任務執行完畢
異步:只管提交任務,不等待這個任務執行完畢就可以去做其他的事情
阻塞:recv、recvfrom、accept,線程階段 運行狀態-->阻塞狀態-->就緒
非阻塞:沒有阻塞狀態
在一個線程的IO模型中,我們recv的地方阻塞,我們就開啟多線程,但是不管你開啟多少個線程,這個recv的時間是不是沒有被規避掉,不管是多線程還是多進程都沒有規避掉這個IO時間。
? 幾乎所有的程序員第一次接觸到的網絡編程都是從listen()、send()、recv() 等接口開始的,使用這些接口可以很方便的構建服務器/客戶機的模型。然而大部分的socket接口都是阻塞型的。如下圖
ps:所謂阻塞型接口是指系統調用(一般是IO接口)不返回調用結果并讓當前線程一直阻塞,只有當該系統調用獲得結果或者超時出錯時才返回。
實際上,除非特別指定,幾乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。這給網絡編程帶來了一個很大的問題,如在調用recv(1024)的同時,線程將被阻塞,在此期間,線程將無法執行任何運算或響應任何的網絡請求。
一個簡單的解決方案:
在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每個連接都擁有獨立的線程(或進程),這樣任何一個連接的阻塞都不會影響其他的連接。
該方案的問題是:
開啟多進程或都線程的方式,在遇到要同時響應成百上千路的連接請求,則無論多線程還是多進程都會嚴重占據系統資源,降低系統對外界響應效率,而且線程與進程本身也更容易進入假死狀態。
改進方案:
很多程序員可能會考慮使用“線程池”或“連接池”。“線程池”旨在減少創建和銷毀線程的頻率,其維持一定合理數量的線程,并讓空閑的線程重新承擔新的執行任務。“連接池”維持連接的緩存池,盡量重用已有的連接、減少創建和關閉連接的頻率。這兩種技術都可以很好的降低系統開銷,都被廣泛應用很多大型系統,如websphere、tomcat和各種數據庫等。
改進后方案其實也存在著問題:
“線程池”和“連接池”技術也只是在一定程度上緩解了頻繁調用IO接口帶來的資源占用。而且,所謂“池”始終有其上限,當請求大大超過上限時,“池”構成的系統對外界的響應并不比沒有池的時候效果好多少。所以使用“池”必須考慮其面臨的響應規模,并根據響應規模調整“池”的大小。
對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,“線程池”或“連接池”或許可以緩解部分壓力,但是不能解決所有問題。總之,多線程模型可以方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,可以用非阻塞接口來嘗試解決這個問題。
三、非阻塞IO(non-Blocking IO)
Linux下,可以通過設置socket使其變為non-blocking。當對一個non-blocking socket執行讀操作時,流程是這個樣子:
? 從圖中可以看出,當用戶進程發出read操作時,如果kernel中的數據還沒有準備好,那么它并不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發起一個read操作后,并不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒有準備好,于是用戶就可以在本次到下次再發起read詢問的時間間隔內做其他事情,或者直接再次發送read操作。一旦kernel中的數據準備好了,并且又再次收到了用戶進程的system call,那么它馬上就將數據拷貝到了用戶內存(這一階段仍然是阻塞的),然后返回。
也就是說非阻塞的recvform系統調用調用之后,進程并沒有被阻塞,內核馬上返回給進程,如果數據還沒準備好,此時會返回一個error。進程在返回之后,可以干點別的事情,然后再發起recvform系統調用。重復上面的過程,循環往復的進行recvform系統調用。這個過程通常被稱之為輪詢。輪詢檢查內核數據,直到數據準備好,再拷貝數據到進程,進行數據處理。需要注意,拷貝數據整個過程,進程仍然是屬于阻塞的狀態。
所以,在非阻塞式IO中,用戶進程其實是需要不斷的主動詢問kernel數據準備好了沒有。
非阻塞IO示例:
服務端
# 服務端
import socket
import time
server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)
server.setblocking(False) #設置不阻塞
r_list=[] #用來存儲所有來請求server端的conn連接
w_list={} #用來存儲所有已經有了請求數據的conn的請求數據
while 1:
try:
conn,addr=server.accept() #不阻塞,會報錯
r_list.append(conn) #為了將連接保存起來,不然下次循環的時候,上一次的連接就沒有了
except BlockingIOError:
# 強調強調強調:!!!非阻塞IO的精髓在于完全沒有阻塞!!!
# time.sleep(0.5) # 打開該行注釋純屬為了方便查看效果
print('在做其他的事情')
print('rlist: ',len(r_list))
print('wlist: ',len(w_list))
# 遍歷讀列表,依次取出套接字讀取內容
del_rlist=[] #用來存儲刪除的conn連接
for conn in r_list:
try:
data=conn.recv(1024) #不阻塞,會報錯
if not data: #當一個客戶端暴力關閉的時候,會一直接收b'',別忘了判斷一下數據
conn.close()
del_rlist.append(conn)
continue
w_list[conn]=data.upper()
except BlockingIOError: # 沒有收成功,則繼續檢索下一個套接字的接收
continue
except ConnectionResetError: # 當前套接字出異常,則關閉,然后加入刪除列表,等待被清除
conn.close()
del_rlist.append(conn)
# 遍歷寫列表,依次取出套接字發送內容
del_wlist=[]
for conn,data in w_list.items():
try:
conn.send(data)
del_wlist.append(conn)
except BlockingIOError:
continue
# 清理無用的套接字,無需再監聽它們的IO操作
for conn in del_rlist:
r_list.remove(conn)
#del_rlist.clear() #清空列表中保存的已經刪除的內容
for conn in del_wlist:
w_list.pop(conn)
#del_wlist.clear()
客戶端
#客戶端
import socket
import os
import time
import threading
client=socket.socket()
client.connect(('127.0.0.1',8083))
while 1:
res=('%s hello' %os.getpid()).encode('utf-8')
client.send(res)
data=client.recv(1024)
print(data.decode('utf-8'))
##多線程的客戶端請求版本
# def func():
# sk = socket.socket()
# sk.connect(('127.0.0.1',9000))
# sk.send(b'hello')
# time.sleep(1)
# print(sk.recv(1024))
# sk.close()
#
# for i in range(20):
# threading.Thread(target=func).start()
雖然我們上面的代碼通過設置非阻塞,規避了IO操作,但是非阻塞IO模型絕不被推薦。
我們不能否定其優點:能夠在等待任務完成的時間里干其他活了(包括提交其他任務,也就是 “后臺” 可以有多個任務在“”同時“”執行)。
但是也難掩其缺點:
#1. 循環調用recv()將大幅度推高CPU占用率;這也是我們在代碼中留一句time.sleep(2)的原因,否則在低配主機下極容易出現卡機情況
#2. 任務完成的響應延遲增大了,因為每過一段時間才去輪詢一次read操作,而任務可能在兩次輪詢之間的任意時間完成。這會導致整體數據吞吐量的降低。
此外,在這個方案中recv()更多的是起到檢測“操作是否完成”的作用,實際操作系統提供了更為高效的檢測“操作是否完成“作用的接口,例如select()多路復用模式,可以一次檢測多個連接是否活躍。
四、多路復用IO(IO multiplexing)(重點)
? IO multiplexing這個詞可能有點陌生,但是如果我說select/epoll,大概就都能明白了。有些地方也稱這種IO方式為事件驅動IO(event driven IO)。我們都知道,select/epoll的好處就在于單個process就可以同時處理多個網絡連接的IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有數據到達了,就通知用戶進程。它的流程如圖:
先看解釋圖,里面的select就像個代理。
? 用戶進程調用了select,那么整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。
這個圖和blocking IO的圖其實并沒有太大的不同,事實上還更差一些。因為它不僅阻塞了還多需要使用兩個系統調用(select和recvfrom),而blocking IO只調用了一個系統調用(recvfrom),當只有一個連接請求的時候,這個模型還不如阻塞IO效率高。但是,用select的優勢在于它可以同時處理多個connection,而阻塞IO那里不能,我不管阻塞不阻塞,你所有的連接包括recv等操作,我都幫你監聽著(以什么形式監聽的呢?先不要考慮,下面會說),其中任何一個有變動(有鏈接,有數據),我就告訴你用戶,那么你就可以去調用這個數據了,這就是他的NB之處。這個IO多路復用模型機制是操作系統幫我們提供的,在windows上有這么個機制叫做select,那么如果我們想通過自己寫代碼來控制這個機制或者自己寫這么個機制,我們可以使用python中的select模塊來完成上面這一系列代理的行為。在一切皆文件的unix下,這些可以接收數據的對象或者連接,都叫做文件描述符fd。
強調:
1. 如果處理的連接數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優勢并不是對于單個連接能處理得更快,而是在于能處理更多的連接。
2. 在多路復用模型中,對于每一個socket,一般都設置成為non-blocking,但是,如上圖所示,整個用戶的process其實是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。
1.Python中的select模塊
import select
fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])
參數:可接受四個參數(前三個必須)
rlist: wait until ready for reading #等待讀的對象,你需要監聽的需要獲取數據的對象列表
wlist: wait until ready for writing #等待寫的對象,你需要寫一些內容的時候,input等等,也就是說我會循環他看看是否有需要發送的消息,如果有我取出這個對象的消息并發送出去,一般用不到,這里我們也給一個[]。
xlist: wait for an “exceptional condition” #等待異常的對象,一些額外的情況,一般用不到,但是必須傳,那么我們就給他一個[]。
timeout: 超時時間
當超時時間 = n(正整數)時,那么如果監聽的句柄均無任何變化,則select會阻塞n秒,之后返回三個空列表,如果監聽的句柄有變化,則直接執行。
返回值:三個列表與上面的三個參數列表是對應的
? select方法用來監視文件描述符(當文件描述符條件不滿足時,select會阻塞),當某個文件描述符狀態改變后,會返回三個列表
1、當參數1 序列中的fd滿足“可讀”條件時,則獲取發生變化的fd并添加到fd_r_list中
2、當參數2 序列中含有fd時,則將該序列中所有的fd添加到 fd_w_list中
3、當參數3 序列中的fd發生錯誤時,則將該發生錯誤的fd添加到 fd_e_list中
4、當超時時間為空,則select會一直阻塞,直到監聽的句柄發生變化
結論: select的優勢在于可以處理多個連接,不適用于單個連接
2.select網絡IO示例
服務端:
#服務端
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8093))
server.listen(5)
# 設置為非阻塞
server.setblocking(False)
# 初始化將服務端socket對象加入監聽列表,后面還要動態添加一些conn連接對象,當accept的時候sk就有感應,當recv的時候conn就有動靜
rlist=[server,]
rdata = {} #存放客戶端發送過來的消息
wlist=[] #等待寫對象
wdata={} #存放要返回給客戶端的消息
print('預備!監聽!!!')
count = 0 #寫著計數用的,為了看實驗效果用的,沒用
while True:
# 開始 select 監聽,對rlist中的服務端server進行監聽,select函數阻塞進程,直到rlist中的套接字被觸發(在此例中,套接字接收到客戶端發來的握手信號,從而變得可讀,滿足select函數的“可讀”條件),被觸發的(有動靜的)套接字(服務器套接字)返回給了rl這個返回值里面;
rl,wl,xl=select.select(rlist,wlist,[],0.5)
print('%s 次數>>'%(count),wl)
count = count + 1
# 對rl進行循環判斷是否有客戶端連接進來,當有客戶端連接進來時select將觸發
for sock in rl:
# 判斷當前觸發的是不是socket對象, 當觸發的對象是socket對象時,說明有新客戶端accept連接進來了
if sock == server:
# 接收客戶端的連接, 獲取客戶端對象和客戶端地址信息
conn,addr=sock.accept()
#把新的客戶端連接加入到監聽列表中,當客戶端的連接有接收消息的時候,select將被觸發,會知道這個連接有動靜,有消息,那么返回給rl這個返回值列表里面。
rlist.append(conn)
else:
# 由于客戶端連接進來時socket接收客戶端連接請求,將客戶端連接加入到了監聽列表中(rlist),客戶端發送消息的時候這個連接將觸發
# 所以判斷是否是客戶端連接對象觸發
try:
data=sock.recv(1024)
#沒有數據的時候,我們將這個連接關閉掉,并從監聽列表中移除
if not data:
sock.close()
rlist.remove(sock)
continue
print("received {0} from client {1}".format(data.decode(), sock))
#將接受到的客戶端的消息保存下來
rdata[sock] = data.decode()
#將客戶端連接對象和這個對象接收到的消息加工成返回消息,并添加到wdata這個字典里面
wdata[sock]=data.upper()
#需要給這個客戶端回復消息的時候,我們將這個連接添加到wlist寫監聽列表中
wlist.append(sock)
#如果這個連接出錯了,客戶端暴力斷開了(注意,我還沒有接收他的消息,或者接收他的消息的過程中出錯了)
except Exception:
#關閉這個連接
sock.close()
#在監聽列表中將他移除,因為不管什么原因,它畢竟是斷開了,沒必要再監聽它了
rlist.remove(sock)
# 如果現在沒有客戶端請求連接,也沒有客戶端發送消息時,開始對發送消息列表進行處理,是否需要發送消息
for sock in wl:
sock.send(wdata[sock])
wlist.remove(sock)
wdata.pop(sock)
# #將一次select監聽列表中有接收數據的conn對象所接收到的消息打印一下
# for k,v in rdata.items():
# print(k,'發來的消息是:',v)
# #清空接收到的消息
# rdata.clear()
客戶端:
#客戶端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8093))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
data=client.recv(1024)
print(data.decode('utf-8'))
client.close()
select監聽fd變化的過程分析:
#用戶進程創建socket對象,拷貝監聽的fd到內核空間,每一個fd會對應一張系統文件表,內核空間的fd響應到數據后,就會發送信號給用戶進程數據已到;
#用戶進程再發送系統調用,比如(accept)將內核空間的數據copy到用戶空間,同時作為接受數據端內核空間的數據清除,這樣重新監聽時fd再有新的數據又可以響應到了(發送端因為基于TCP協議所以需要收到應答后才會清除)。
該模型的優點:
#相比其他模型,使用select() 的事件驅動模型只用單線程(進程)執行,占用資源少,不消耗太多 CPU,同時能夠為多客戶端提供服務。如果試圖建立一個簡單的事件驅動的服務器程序,這個模型有一定的參考價值。
該模型的缺點:
#首先select()接口并不是實現“事件驅動”的最好選擇。因為當需要探測的句柄值較大時,select()接口本身需要消耗大量時間去輪詢各個句柄。很多操作系統提供了更為高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要實現更高效的服務器程序,類似epoll這樣的接口更被推薦。遺憾的是不同的操作系統特供的epoll接口有很大差異,所以使用類似于epoll的接口實現具有較好跨平臺能力的服務器會比較困難。
#其次,該模型將事件探測和事件響應夾雜在一起,一旦事件響應的執行體龐大,則對整個模型是災難性的。
select做得事情和第二階段的阻塞沒有關系,就是從內核態將數據拷貝到用戶態的阻塞,始終幫你做得監聽的工作,幫你節省了一些第一階段阻塞的時間。
IO多路復用的機制:
select機制: Windows、Linux
poll機制 : Linux #和lselect監聽機制一樣,但是對監聽列表里面的數量沒有限制,select默認限制是1024個,但是他們兩個都是操作系統輪詢每一個被監聽的文件描述符(如果數量很大,其實效率不太好),看是否有可讀操作。
epoll機制 : Linux #它的監聽機制和上面兩個不同,他給每一個監聽的對象綁定了一個回調函數,你這個對象有消息,那么觸發回調函數給用戶,用戶就進行系統調用來拷貝數據,并不是輪詢監聽所有的被監聽對象,這樣的效率高很多。
五、異步IO(Asynchronous IO)
Linux下的asynchronous IO其實用得不多,從內核2.6版本才開始引入。先看一下它的流程:
用戶進程發起read操作之后,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之后,首先它會立刻返回,所以不會對用戶進程產生任何block。然后,kernel操作系統會等待數據(阻塞)準備完成,然后將數據拷貝到用戶內存,當這一切都完成之后,kernel會給用戶進程發送一個signal,告訴它read操作完成了。
貌似異步IO這個模型很牛~~但是你發現沒有,這不是我們自己代碼控制的,都是操作系統完成的,而python在copy數據這個階段沒有提供操縱操作系統的接口,所以用python沒法實現這套異步IO機制,其他幾個IO模型都沒有解決第二階段的阻塞(用戶態和內核態之間copy數據),但是C語言是可以實現的,因為大家都知道C語言是最接近底層的,雖然我們用python實現不了,但是python仍然有異步的模塊和框架(tornado、twstied,高并發需求的時候用),這些模塊和框架很多都是用底層的C語言實現的,它幫我們實現了異步,你只要使用就可以了,但是你要知道這個異步是不是很好呀,不需要你自己等待了,操作系統幫你做了所有的事情,你就直接收數據就行了,就像你有一張銀行卡,銀行定期給你打錢一樣。
六、IO模型比較分析
? 到目前為止,已經將四個IO Model都介紹完了。現在回過頭來回答最初的那幾個問題:blocking和non-blocking的區別在哪,synchronous IO和asynchronous IO的區別在哪。
先回答最簡單的這個:blocking vs non-blocking。前面的介紹中其實已經很明確的說明了這兩者的區別。調用blocking IO會一直block住對應的進程直到操作完成,而non-blocking IO在kernel還準備數據的情況下會立刻返回。
? 再說明synchronous IO和asynchronous IO的區別之前,需要先給出兩者的定義。Stevens給出的定義(其實是POSIX的定義)是這樣子的:
A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked;
兩者的區別就在于synchronous IO做”IO operation”的時候會將process阻塞。按照這個定義,四個IO模型可以分為兩大類,之前所述的blocking IO,non-blocking IO,IO multiplexing都屬于synchronous IO這一類,而 asynchronous I/O后一類 。
有人可能會說,non-blocking IO并沒有被block啊。這里有個非常“狡猾”的地方,定義中所指的”IO operation”是指真實的IO操作,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,如果kernel的數據沒有準備好,這時候不會block進程。但是,當kernel中數據準備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。而asynchronous IO則不一樣,當進程發起IO 操作之后,就直接返回再也不理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程中,進程完全沒有被block。
各個IO Model的比較如圖所示:
? 經過上面的介紹,會發現non-blocking IO和asynchronous IO的區別還是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,但是它仍然要求進程去主動的check,并且當數據準備完成以后,也需要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。而asynchronous IO則完全不同。它就像是用戶進程將整個IO操作交給了他人(kernel)完成,然后他人做完后發信號通知。在此期間,用戶進程不需要去檢查IO操作的狀態,也不需要主動的去拷貝數據。
七、selectors模塊
IO復用:為了解釋這個名詞,首先來理解下復用這個概念,復用也就是共用的意思,這樣理解還是有些抽象,為此,咱們來理解下復用在通信領域的使用,在通信領域中為了充分利用網絡連接的物理介質,往往在同一條網絡鏈路上采用時分復用或頻分復用的技術使其在同一鏈路上傳輸多路信號,到這里我們就基本上理解了復用的含義,即公用某個“介質”來盡可能多的做同一類(性質)的事,那IO復用的“介質”是什么呢?為此我們首先來看看服務器編程的模型,客戶端發來的請求服務端會產生一個進程來對其進行服務,每當來一個客戶請求就產生一個進程來服務,然而進程不可能無限制的產生,因此為了解決大量客戶端訪問的問題,引入了IO復用技術,即:一個進程可以同時對多個客戶請求進行服務。也就是說IO復用的“介質”是進程(準確的說復用的是select和poll,因為進程也是靠調用select和poll來實現的),復用一個進程(select和poll)來對多個IO進行服務,雖然客戶端發來的IO是并發的但是IO所需的讀寫數據多數情況下是沒有準備好的,因此就可以利用一個函數(select和poll)來監聽IO所需的這些數據的狀態,一旦IO有數據可以進行讀寫了,進程就來對這樣的IO進行服務。
理解完IO復用后,我們在來看下實現IO復用中的三個API(select、poll和epoll)的區別和聯系:
select,poll,epoll都是IO多路復用的機制,I/O多路復用就是通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知應用程序進行相應的讀寫操作。但select,poll,epoll本質上都是同步I/O,因為他們都需要在讀寫事件就緒后自己負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需自己負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。
1.select
select的原型:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
select的第一個參數nfds為fdset集合中最大描述符值加1,fdset是一個位數組,其大小限制為__FD_SETSIZE(1024),位數組的每一位代表其對應的描述符是否需要被檢查。第二三四參數表示需要關注讀、寫、錯誤事件的文件描述符位數組,這些參數既是輸入參數也是輸出參數,可能會被內核修改用于標示哪些描述符上發生了關注的事件,所以每次調用select前都需要重新初始化fdset。timeout參數為超時時間,該結構會被內核修改,其值為超時剩余的時間。
select的調用步驟:
(1)使用copy_from_user從用戶空間拷貝fdset到內核空間
(2)注冊回調函數__pollwait
(3)遍歷所有fd,調用其對應的poll方法(對于socket,這個poll方法是sock_poll,sock_poll根據情況會調用到tcp_poll,udp_poll或者datagram_poll)
(4)以tcp_poll為例,其核心實現就是__pollwait,也就是上面注冊的回調函數。
(5)__pollwait的主要工作就是把current(當前進程)掛到設備的等待隊列中,不同的設備有不同的等待隊列,對于tcp_poll 來說,其等待隊列是sk->sk_sleep(注意把進程掛到等待隊列中并不代表進程已經睡眠了)。在設備收到一條消息(網絡設備)或填寫完文件數 據(磁盤設備)后,會喚醒設備等待隊列上睡眠的進程,這時current便被喚醒了。
(6)poll方法返回時會返回一個描述讀寫操作是否就緒的mask掩碼,根據這個mask掩碼給fd_set賦值。
(7)如果遍歷完所有的fd,還沒有返回一個可讀寫的mask掩碼,則會調用schedule_timeout是調用select的進程(也就是 current)進入睡眠。當設備驅動發生自身資源可讀寫后,會喚醒其等待隊列上睡眠的進程。如果超過一定的超時時間(schedule_timeout 指定),還是沒人喚醒,則調用select的進程會重新被喚醒獲得CPU,進而重新遍歷fd,判斷有沒有就緒的fd。
(8)把fd_set從內核空間拷貝到用戶空間。
select的幾個缺點:
(1)每次調用select,都需要把fd集合從用戶態拷貝到內核態,這個開銷在fd很多時會很大
(2)同時每次調用select都需要在內核遍歷傳遞進來的所有fd,這個開銷在fd很多時也很大
(3)select支持的文件描述符數量太小了,默認是1024
2.poll
poll的原型:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
? poll與select不同,通過一個pollfd數組向內核傳遞需要關注的事件,故沒有描述符個數的限制,pollfd中的events字段和revents分別用于標示關注的事件和發生的事件,故pollfd數組只需要被初始化一次。
? poll的實現機制與select類似,其對應內核中的sys_poll,只不過poll向內核傳遞pollfd數組,然后對pollfd中的每個描述符進行poll,相比處理fdset來說,poll效率更高。poll返回后,需要對pollfd中的每個元素檢查其revents值,來得指事件是否發生。
3.epoll
? 直到Linux2.6才出現了由內核直接支持的實現方法,那就是epoll,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。epoll可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜。epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這里也使用了內存映射(mmap)技術,這樣便徹底省掉了這些文件描述符在系統調用時復制的開銷。另一個本質的改進在于epoll采用基于事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法后,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基于某個文件描述符就緒時,內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。
epoll的原型:
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
? epoll既然是對select和poll的改進,就應該能避免上述的三個缺點。那epoll都是怎么解決的呢?在此之前,我們先看一下epoll 和select和poll的調用接口上的不同,select和poll都只提供了一個函數——select或者poll函數。而epoll提供了三個函 數,epoll_create,epoll_ctl和epoll_wait,epoll_create是創建一個epoll句柄;epoll_ctl是注 冊要監聽的事件類型;epoll_wait則是等待事件的產生。
對于第一個缺點,epoll的解決方案在epoll_ctl函數中。每次注冊新的事件到epoll句柄中時(在epoll_ctl中指定 EPOLL_CTL_ADD),會把所有的fd拷貝進內核,而不是在epoll_wait的時候重復拷貝。epoll保證了每個fd在整個過程中只會拷貝 一次。
對于第二個缺點,epoll的解決方案不像select或poll一樣每次都把current輪流加入fd對應的設備等待隊列中,而只在 epoll_ctl時把current掛一遍(這一遍必不可少)并為每個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調 函數,而這個回調函數會把就緒的fd加入一個就緒鏈表)。epoll_wait的工作實際上就是在這個就緒鏈表中查看有沒有就緒的fd(利用 schedule_timeout()實現睡一會,判斷一會的效果,和select實現中的第7步是類似的)。
對于第三個缺點,epoll沒有這個限制,它所支持的FD上限是最大可以打開文件的數目,這個數字一般遠大于2048,舉個例子, 在1GB內存的機器上大約是10萬左右,具體數目可以cat /proc/sys/fs/file-max察看,一般來說這個數目和系統內存關系很大。
epoll實現代碼示例:(了解即可)
#!/usr/bin/env python
import select
import socket
response = b''
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
# 因為socket默認是阻塞的,所以需要使用非阻塞(異步)模式。
serversocket.setblocking(0)
# 創建一個epoll對象
epoll = select.epoll()
# 在服務端socket上面注冊對讀event的關注。一個讀event隨時會觸發服務端socket去接收一個socket連接
epoll.register(serversocket.fileno(), select.EPOLLIN)
try:
# 字典connections映射文件描述符(整數)到其相應的網絡連接對象
connections = {}
requests = {}
responses = {}
while True:
# 查詢epoll對象,看是否有任何關注的event被觸發。參數“1”表示,我們會等待1秒來看是否有event發生。
# 如果有任何我們感興趣的event發生在這次查詢之前,這個查詢就會帶著這些event的列表立即返回
events = epoll.poll(1)
# event作為一個序列(fileno,event code)的元組返回。fileno是文件描述符的代名詞,始終是一個整數。
for fileno, event in events:
# 如果是服務端產生event,表示有一個新的連接進來
if fileno == serversocket.fileno():
connection, address = serversocket.accept()
print('client connected:', address)
# 設置新的socket為非阻塞模式
connection.setblocking(0)
# 為新的socket注冊對讀(EPOLLIN)event的關注
epoll.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
# 初始化接收的數據
requests[connection.fileno()] = b''
# 如果發生一個讀event,就讀取從客戶端發送過來的新數據
elif event & select.EPOLLIN:
print("------recvdata---------")
# 接收客戶端發送過來的數據
requests[fileno] += connections[fileno].recv(1024)
# 如果客戶端退出,關閉客戶端連接,取消所有的讀和寫監聽
if not requests[fileno]:
connections[fileno].close()
# 刪除connections字典中的監聽對象
del connections[fileno]
# 刪除接收數據字典對應的句柄對象
del requests[connections[fileno]]
print(connections, requests)
epoll.modify(fileno, 0)
else:
# 一旦完成請求已收到,就注銷對讀event的關注,注冊對寫(EPOLLOUT)event的關注。寫event發生的時候,會回復數據給客戶端
epoll.modify(fileno, select.EPOLLOUT)
# 打印完整的請求,證明雖然與客戶端的通信是交錯進行的,但數據可以作為一個整體來組裝和處理
print('-' * 40 + '\n' + requests[fileno].decode())
# 如果一個寫event在一個客戶端socket上面發生,它會接受新的數據以便發送到客戶端
elif event & select.EPOLLOUT:
print("-------send data---------")
# 每次發送一部分響應數據,直到完整的響應數據都已經發送給操作系統等待傳輸給客戶端
byteswritten = connections[fileno].send(requests[fileno])
requests[fileno] = requests[fileno][byteswritten:]
if len(requests[fileno]) == 0:
# 一旦完整的響應數據發送完成,就不再關注寫event
epoll.modify(fileno, select.EPOLLIN)
# HUP(掛起)event表明客戶端socket已經斷開(即關閉),所以服務端也需要關閉。
# 沒有必要注冊對HUP event的關注。在socket上面,它們總是會被epoll對象注冊
elif event & select.EPOLLHUP:
print("end hup------")
# 注銷對此socket連接的關注
epoll.unregister(fileno)
# 關閉socket連接
connections[fileno].close()
del connections[fileno]
finally:
# 打開的socket連接不需要關閉,因為Python會在程序結束的時候關閉。這里顯式關閉是一個好的代碼習慣
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()
---------------------
本文來自 richard1ybb 的CSDN 博客 ,全文地址請點擊:https://blog.csdn.net/richard1ybb/article/details/74573200?utm_source=copy
總結
(1)select,poll實現需要自己不斷輪詢所有fd集合,直到設備就緒,期間可能要睡眠和喚醒多次交替。而epoll其實也需要調用 epoll_wait不斷輪詢就緒鏈表,期間也可能多次睡眠和喚醒交替,但是它是設備就緒時,調用回調函數,把就緒fd放入就緒鏈表中,并喚醒在 epoll_wait中進入睡眠的進程。雖然都要睡眠和交替,但是select和poll在“醒著”的時候要遍歷整個fd集合,而epoll在“醒著”的 時候只要判斷一下就緒鏈表是否為空就行了,這節省了大量的CPU時間,這就是回調機制帶來的性能提升。
(2)select,poll每次調用都要把fd集合從用戶態往內核態拷貝一次,并且要把current往設備等待隊列中掛一次,而epoll只要 一次拷貝,而且把current往等待隊列上掛也只掛一次(在epoll_wait的開始,注意這里的等待隊列并不是設備等待隊列,只是一個epoll內 部定義的等待隊列),這也能節省不少的開銷。
4.selector
? 這三種IO多路復用模型在不同的平臺有著不同的支持,而epoll在windows下就不支持,好在我們有selectors模塊,幫我們默認選擇當前平臺下最合適的,我們只需要寫監聽誰,然后怎么發送消息接收消息,但是具體怎么監聽的,選擇的是select還是poll還是epoll,這是selector幫我們自動選擇的
selector代碼示例:
服務端
#服務端
from socket import *
import selectors
sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
conn,addr=server_fileobj.accept()
sel.register(conn,selectors.EVENT_READ,read)
def read(conn,mask):
try:
data=conn.recv(1024)
if not data:
print('closing',conn)
sel.unregister(conn)
conn.close()
return
conn.send(data.upper()+b'_SB')
except Exception:
print('closing', conn)
sel.unregister(conn)
conn.close()
server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #設置socket的接口為非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #相當于網select的讀列表里append了一個文件句柄server_fileobj,并且綁定了一個回調函數accept
while True:
events=sel.select() #檢測所有的fileobj,是否有完成wait data的
for sel_obj,mask in events:
callback=sel_obj.data #callback=accpet
callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)
客戶端
#客戶端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))
while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))
小練習:基于selectors模塊實現并發的FTP
總結
以上是生活随笔為你收集整理的python io多路复用_Python之IO多路复用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: excel流程图分叉 合并_Excel和
- 下一篇: python 整合excel_pytho