Python进阶(5)_进程与线程之协程、I/O模型
三、協程
3.1協程概念
協程:又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程。
協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當于進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
協程的適用場景:當程序中存在大量不需要CPU的操作時(IO),適用于協程
?
協程的好處:
-
無需線程上下文切換的開銷
-
無需原子操作鎖定及同步的開銷方便切換控制流,簡化編程模型
"原子操作(atomic operation)是不需要synchronized",所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。
-
高并發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用于高并發處理。
協程的缺點:
-
無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
-
進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序
?
協程定義或標準(滿足1,2,3就可稱為協程):
必須在只有一個單線程里實現并發
修改共享數據不需加鎖
用戶程序里自己保存多個控制流的上下文棧
一個協程遇到IO操作自動切換到其它協程
“上下文”,指的是程序在執行中的一個狀態。通常我們會用調用棧來表示這個狀態——棧記載了每個調用層級執行到哪里,還有執行時的環境情況等所有有關的信息。
“上下文切換”,表達的是一種從一個上下文切換到另一個上下文執行的技術。而“調度”指的是決定哪個上下文可以獲得接下去的CPU時間的方法。
?
與線程比較:
1. python的線程屬于內核級別的,即由操作系統控制調度(如單線程一旦遇到io就被迫交出cpu執行權限,切換其他線程運行)
2. 單線程內開啟協程,一旦遇到io,從應用程序級別(而非操作系統)控制切換
?
對比操作系統控制線程的切換,用戶在單線程內控制協程的切換,優點如下:
1. ?協程的切換開銷更小,屬于程序級別的切換,操作系統完全感知不到,因而更加輕量級
2. 單線程內就可以實現并發的效果,最大限度地利用cpu
?
用yield生成器函數實現單線程下保存程序的運行狀態:
import timedef consumer():r = ''while True:n = yield rprint('[CONSUMER] ←← Consuming %s...' % n)time.sleep(1)r = '200 OK'def produce(c):next(c)n = 0while n < 5:n = n + 1print('[PRODUCER] →→ Producing %s...' % n)cr = c.send(n) # cr="200 ok"print('[PRODUCER] Consumer return: %s' % cr)c.close()if __name__=='__main__':c=consumer() # c:生成器對象produce(c)?
3.2 greenlet類實現協程
greenlet機制的主要思想是:生成器函數或者協程函數中的yield語句掛起函數的執行,直到稍后使用next()或send()操作進行恢復為止。可以使用一個調度器循環在一組生成器函數之間協作多個任務。greentlet是python中實現我們所謂的"Coroutine(協程)"的一個基礎庫.
?
用greenlet類實現協程舉例:
from greenlet import greenletdef test1():print (12)gr2.switch()print (34)gr2.switch()def test2():print (56)gr1.switch()print (78)gr1 = greenlet(test1) gr2 = greenlet(test2)gr1.switch()>>:12563478?
3.3 基于greenlet類用 gevent模塊實現協程
Python通過yield提供了對協程的基本支持,但是不完全。而第三方的gevent為Python提供了比較完善的協程支持。
gevent是第三方庫,通過greenlet實現協程,其基本思想是:
當一個greenlet遇到IO操作時,比如訪問網絡,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由于IO操作非常耗時,經常使程序處于等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO。
由于切換是在IO操作時自動完成,所以gevent需要修改Python自帶的一些標準庫,這一過程在啟動時通過monkey patch完成:
用gevent模塊實現爬蟲
from gevent import monkey monkey.patch_all() import requests,gevent,timedef foo(url):respnse=requests.get(url)respnse_str=respnse.textprint("GET data %s"%len(respnse_str))s=time.time() gevent.joinall([gevent.spawn(foo,"https://itk.org/"),gevent.spawn(foo, "https://www.github.com/"),gevent.spawn(foo, "https://baidu.com/")])print(time.time()-s)上例中還可以用gevent.sleep(2)來模擬gevent可以識別的i/o阻塞
而time.sleep(2)或其他的阻塞 gevent是不能直接識別的,需要添加補丁,添加補丁代碼如下:
from gevent import monkey monkey.patch_all()補丁代碼必須放在導入其他模塊之前,及放在文件開頭
?
附:用進程池、多線程、協程爬蟲時間比較
from gevent import monkey monkey.patch_all() import requests import re from multiprocessing import Pool import time,threading import geventdef getpage(res):response_str=requests.get(res)print('ecdoing is :',response_str.encoding)return response_str.textdef js(ret):li=[]for item in ret:dic={'title':item[2],'date':item[1],'評論數':item[0]}li.append(dic)f=open('acfun.txt','a',encoding='utf-8')for i in li:f.write(str(i))f.write('\n')f.close()def run(n):url='http://www.acfun.cn/v/list73/index_%s.htm'%nprint(url)response=getpage(url)# response=response.encode('ISO-8859-1').decode('utf-8')obj=re.compile('<span class="a">(\d+)</span>.*?<a href=.*? target=".*?" title="發布于 (.*?)" class="title">(.*?)</a>',re.S)# obj = re.compile(r'<img.*?src=.(\S+\.jpg).*?', re.S)ret=obj.findall(response)# print(ret)return js(ret)if __name__ == '__main__':start_time=time.time()#順序執行# start_time=time.time()# for j in range(1,100):# run(j)# #順序執行cost time: 51.30734419822693#多線程并發執行# li=[]# for j in range(1,100):# j = threading.Thread(target=run, args=(j,))# j.start()# li.append(j)# for obj in li:# obj.join()# 并發執行不使用join cost time: 0.20418000221252441# 并發執行使用join cost time: 4.524945974349976#使用進程池# p = Pool(5)# for i in range(1,100):# p.apply_async(func=run,args=(i,))# p.close()# p.join()#使用進程池cost time: 6.876262426376343#使用協程li = []for i in range(1, 100):li.append(gevent.spawn(run, i))gevent.joinall(li)#使用協程第一次cost time: 4.432950973510742#使用協程第二次cost time: 30.864907264709473#使用協程第三次cost time: 13.472567558288574end_time=time.time()print('cost time:', end_time-start_time) 使用多線程、進程池、協程爬蟲時間比較?
四、I/O模型
Linux環境下的network IO?Model分為:
- ??? blocking IO
- ??? nonblocking IO
- ??? IO multiplexing
- ??? signal driven IO
- ??? asynchronous IO
由于signal driven IO在實際中并不常用,所以我這只提及剩下的四種IO Model。
再說一下IO發生時涉及的對象和步驟。
對于一個network IO (這里我們以read舉例),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另一個就是系統內核(kernel)。當一個read操作發生時,它會經歷兩個階段:
- ?等待數據準備 (Waiting for the data to be ready)
- ?將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)
記住這兩點很重要,因為這些IO Model的區別就是在兩個階段上各有不同的情況。
4.1 blocking IO (阻塞IO)
在linux中,默認情況下所有的socket都是blocking,一個典型的讀操作流程大概是這樣:
當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據。對于network io來說,很多時候數據在一開始還沒有到達(比如,還沒有收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,然后kernel返回結果,用戶進程才解除block的狀態,重新運行起來。
blocking IO的特點:在IO執行的兩個階段都被block了,全程阻塞
?
?
4.2 non-blocking IO(非阻塞IO)
linux下,可以通過設置socket使其變為non-blocking。當對一個non-blocking socket執行讀操作時,流程是這個樣子:
從圖中可以看出,當用戶進程發出read操作時,如果kernel中的數據還沒有準備好,那么它并不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發起一個read操作后,并不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒有準備好,于是它可以再次發送read操作。一旦kernel中的數據準備好了,并且又再次收到了用戶進程的system call,那么它馬上就將數據拷貝到了用戶內存,然后返回。所以,用戶進程其實是需要不斷的主動詢問kernel數據好了沒有。
?
優點:能夠在等待任務完成的時間里干其他活了(包括提交其他任務,也就是 “后臺” 可以有多個任務在同時執行)。
缺點:任務完成的響應延遲增大了,因為每過一段時間才去輪詢一次read操作,而任務可能在兩次輪詢之間的任意時間完成。這會導致整體數據吞吐量的降低。
?
4.3 IO multiplexing(IO多路復用)
?IO multiplexing這個詞可能有點陌生,但是如果我說select,epoll,大概就都能明白了。有些地方也稱這種IO方式為event driven IO。我們都知道,select/epoll的好處就在于單個process就可以同時處理多個網絡連接的IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有數據到達了,就通知用戶進程。它的流程如圖:
?
當用戶進程調用了select,那么整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。
這個圖和blocking IO的圖其實并沒有太大的不同,事實上,還更差一些。因為這里需要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。但是,用select的優勢在于它可以同時處理多個connection。
(所以,如果處理的連接數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優勢并不是對于單個連接能處理得更快,而是在于能處理更多的連接。)
在IO multiplexing Model中,實際中,對于每一個socket,一般都設置成為non-blocking,但是,如上圖所示,整個用戶的process其實是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。
結論: select的優勢在于可以處理多個連接,不適用于單個連接?
?4.4 Asynchronous I/O(異步IO)
?linux下的asynchronous IO其實用得很少。先看一下它的流程:?
?
用戶進程發起read操作之后,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之后,首先它會立刻返回,所以不會對用戶進程產生任何block。然后,kernel會等待數據準備完成,然后將數據拷貝到用戶內存,當這一切都完成之后,kernel會給用戶進程發送一個signal,告訴它read操作完成了。
4.5 IO模型比較分析
?各個IO Model的比較如圖所示:
?
4.6 selectors模塊
import selectors import socketsel = selectors.DefaultSelector()def accept(sock, mask):conn, addr = sock.accept() # Should be readyprint('accepted', conn, 'from', addr)conn.setblocking(False)sel.register(conn, selectors.EVENT_READ, read)def read(conn, mask):data = conn.recv(1000) # Should be readyif data:print('echoing', repr(data), 'to', conn)conn.send(data) # Hope it won't blockelse:print('closing', conn)sel.unregister(conn)conn.close()sock = socket.socket() sock.bind(('localhost', 1234)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept)while True:events = sel.select()for key, mask in events:callback = key.datacallback(key.fileobj, mask)
?
轉載于:https://www.cnblogs.com/hedeyong/p/7214345.html
總結
以上是生活随笔為你收集整理的Python进阶(5)_进程与线程之协程、I/O模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Azkaban使用简单笔记
- 下一篇: python pip 升级