日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python并发之协程gevent基础(5)

發布時間:2024/2/28 python 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python并发之协程gevent基础(5) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1,gevent介紹

gevent是第三方庫,通過?greenlet?實現?coroutine,創建、調度的開銷比?線程(thread)?還小,因此程序內部的?執行流?效率高。

gevent 實現了 python 標準庫中一些阻塞庫的非阻塞版本,如 socket、os、select 等 (全部的可參考?gevent1.0 的 monkey.py 源碼),可用這些非阻塞的庫替代 python 標準庫中的阻塞的庫。

gevent 提供的 API 與 python 標準庫中的用法和名稱類似。

其基本思想是:當一個greenlet遇到IO操作時,比如訪問網絡,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由于IO操作非常耗時,經常使程序處于等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO。

gevent是基于協程的Python網絡庫。特點:

基于libev的快速事件循環(Linux上epoll,FreeBSD上kqueue)。
基于greenlet的輕量級執行單元。
API的概念和Python標準庫一致(如事件,隊列)。
可以配合socket,ssl模塊使用。
能夠使用標準庫和第三方模塊創建標準的阻塞套接字(gevent.monkey)。
默認通過線程池進行DNS查詢,也可通過c-are(通過GEVENT_RESOLVER=ares環境變量開啟)。
TCP/UDP/HTTP服務器
子進程支持(通過gevent.subprocess)
線程池
gevent常用方法:

gevent.spawn()?? ?創建一個普通的Greenlet對象并切換
gevent.spawn_later(seconds=3)?? ?延時創建一個普通的Greenlet對象并切換
gevent.spawn_raw()?? ?創建的協程對象屬于一個組
gevent.getcurrent()?? ?返回當前正在執行的greenlet
gevent.joinall(jobs)?? ?將協程任務添加到事件循環,接收一個任務列表
gevent.wait()?? ?可以替代join函數等待循環結束,也可以傳入協程對象列表
gevent.kill()?? ?殺死一個協程
gevent.killall()?? ?殺死一個協程列表里的所有協程
monkey.patch_all()?? ?非常重要,會自動將python的一些標準模塊替換成gevent框架
greenlet常用實例方法:

# Greenlet對象
from gevent import Greenlet
?
# Greenlet對象創建
job = Greenlet(target0, 3)
Greenlet.spawn() # 創建一個協程并啟動
Greenlet.spawn_later(seconds=3) # 延時啟動
?
# 協程啟動
job.start() # 將協程加入循環并啟動協程
job.start_later(3) # 延時啟動
?
# 等待任務完成
job.join() # 等待任務完成
job.get() # 獲取協程返回的值
?
# 任務中斷和判斷任務狀態
job.dead() # 判斷協程是否死亡
job.kill() # 殺死正在運行的協程并喚醒其他的協程,這個協程將不會再執行,可以
job.ready() # 任務完成返回一個真值
job.successful() # 任務成功完成返回真值,否則拋出錯誤
?
# 獲取屬性
job.loop # 時間循環對象
job.value # 獲取返回的值
?
# 捕捉異常
job.exception # 如果運行有錯誤,獲取它
job.exc_info # 錯誤的詳細信息
?
# 設置回調函數
job.rawlink(back) # 普通回調,將job對象作為回調函數的參數
job.unlink() # 刪除回調函數
# 執行成功的回調函數
job.link_value(back)
# 執行失敗的回調函數
job.link_exception(back)
?gevent.Pool的特殊方法:

pool.wait_available():等待直到有一個協程有結果
pool.dd(greenlet):向進程池添加一個方法并跟蹤,非阻塞
pool.discard(greenlet):停止跟蹤某個協程
pool.start(greenlet):加入并啟動協程
pool.join():阻塞等待結束
pool.kill():殺死所有跟蹤的協程
pool.killone(greenlet):殺死一個協程
2,什么時候用/不用gevent

gevent 的優勢:

可以通過同步的邏輯實現并發操作,大大降低了編寫并行/并發程序的難度
在一個進程中使用 gevent 可以有效避免對?臨界資源?的互斥訪問
如果程序涉及較多的 I/O,可用 gevent 替代多線程來提高程序效率。但由于

gevent 中 coroutine 的調度是由使用者而非操作系統決定
主要解決的是 I/O 問題,提高?IO-bound?類型的程序的效率
由于是在一個進程中實現 coroutine,且操作系統以進程為單位分配處理機資源 (一個進程分配一個處理機)
因此,gevent 不適合在以下場景中使用:

對任務延遲有要求的場景,如交互式程序中 (此時需要操作系統進行?公平調度)
CPU-bound?任務
當需要使用多處理機時 (可通過運行多個進程,每個進程內實現 coroutine 來解決這個問題)
3,gevent操作

如何生成 greenlet instance

一般有兩種方法:

使用?gevent.spawn()?API
subclass?Greenlet
第一種方法是調用了?Greenlet?class 中的?spawn?類方法,且生成 greenlet instance 后將其放入 coroutine 的調度隊列中。第二種方法需要手動通過?instance.start()?方法手動將其加入到 coroutine 的調度隊列中。
代碼示例:

import gevent
from gevent import Greenlet
?
?
class MyGreen(Greenlet):
? ? def __init__(self, timeout, msg):
? ? ? ? Greenlet.__init__(self)
? ? ? ? self.timeout = timeout
? ? ? ? self.msg = msg
?
? ? def _run(self):
? ? ? ? print("I'm from subclass of Greenlet and want to say: %s" % (self.msg,))
? ? ? ? gevent.sleep(self.timeout)
? ? ? ? print("I'm from subclass of Greenlet and done!")
?
?
class TestMultigreen(object):
? ? def __init__(self, timeout=0):
? ? ? ? self.timeout = timeout
?
? ? def run(self):
? ? ? ? green0 = gevent.spawn(self._task, 0, 'just 0 test') #方式一:使用gevent的spawn方法創建greenlet實例
? ? ? ? green1 = Greenlet.spawn(self._task, 1, 'just 1 test') #方式一:使用Greenlet的spawn方法創建greenlet實例
? ? ? ? green2 = MyGreen(self.timeout, 'just 2 test') #方式二:使用自定義的Greenlet子類創建實例,需要調用start()手動將greenlet實例加入到 coroutine 的調度隊列中
? ? ? ? green2.start()
?
? ? ? ? gevent.joinall([green0, green1, green2])
? ? ? ? print('Tasks done!')
?
? ? def _task(self, pid, msg):
? ? ? ? print("I'm task %d and want to say: %s" % (pid, msg))
? ? ? ? gevent.sleep(self.timeout)
? ? ? ? print("Task %d done." % (pid,))
?
?
if __name__ == '__main__':
? ? test = TestMultigreen()
? ? test.run()
需要注意:

若僅是想生成 greenlet instance 并置于調度隊列中,最好采用?gevent.spawn()?API
若想僅生成 greenlet instance 且暫時不想加入到調度隊列,則可采用第二種方法。之后若想將其加入到調度隊列,則手動執行?instance.start()?方法。
如何進行主線程到 hub greenlet instance 的切換

gevent.sleep()
Greenlet 或 Greenlet 子類的 instance 的?join()?方法
monkey patch 的庫或方法 (參見?monkey.py):
socket
ssl
os.fork
time.sleep
select.select
thread
subprocess
sys.stdin,sys.stdout,sys.stderr
4,gevent核心功能

Greenlets
同步和異步執行
確定性
創建Greenlets
Greenlet狀態
程序停止
超時
猴子補丁
4.1,Greenlets

在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。

????一個 “greenlet” 是一個小型的獨立偽線程。可以把它想像成一些棧幀,棧底是初始調用的函數,而棧頂是當前greenlet的暫停位置。你使用greenlet創建一堆這樣的堆棧,然后在他們之間跳轉執行。跳轉必須顯式聲明的:一個greenlet必須選擇要跳轉到的另一個greenlet,這會讓前一個掛起,而后一個在此前掛起處恢復執行。不同greenlets之間的跳轉稱為切換(switching) 。

??????greenlet不是一種真正的并發機制,而是在同一線程內,在不同函數的執行代碼塊之間切換,實施“你運行一會、我運行一會”,并且在進行切換時必須指定何時切換以及切換到哪。

greenlet類主要有兩個方法:

switch:用來切換協程;

throw():用來拋出異常同時終止程序;

from greenlet import greenlet
import time
?
def test1(gr,g):
? ? for i in range(100):
? ? ? ? print("---A--")
? ? ? ? gr.switch(g, gr) # 切換到另一個協程執行
? ? ? ? time.sleep(0.5)
?
def test2(gr, g):
? ? for i in range(100):
? ? ? ? print("---B--")
? ? ? ? gr.switch(g, gr)
? ? ? ? # gr.throw(AttributeError)
? ? ? ? time.sleep(0.5)
?
if __name__ == '__main__':
? ? # 創建一個協程1
? ? gr1 = greenlet(test1)
? ? # 創建一個協程2
? ? gr2 = greenlet(test2)
? ? # 啟動協程
? ? gr1.switch(gr2, gr1)
?

4.2,同步和異步執行

并發的核心思想在于,大的任務可以分解成一系列的子任務,后者可以被調度成 同時執行或異步執行,而不是一次一個地或者同步地執行。兩個子任務之間的 切換也就是上下文切換。在gevent里面,上下文切換是通過yielding來完成的.

當我們在受限于網絡或IO的函數中使用gevent,這些函數會被協作式的調度, gevent的真正能力會得到發揮。Gevent處理了所有的細節, 來保證你的網絡庫會在可能的時候,隱式交出greenlet上下文的執行權。

示例如下:

例子中的select()函數通常是一個在各種文件描述符上輪詢的阻塞調用。

import time
import gevent
start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)
def gr1():
? ? print('Started Polling: %s' % tic())
? ? select.select([], [], [], 1)
? ? print('Ended Polling: %s' % tic())
def gr2():
? ? print('Started Polling: %s' % tic())
? ? select.select([], [], [], 2)
? ? print('Ended Polling: %s' % tic())
def gr3():
? ? print("Hey lets do some stuff while the greenlets poll, %s" % tic())
? ? gevent.sleep(3)
? ? print('Ended Polling: %s' % tic())
gevent.joinall([
? ? gevent.spawn(gr1),
? ? gevent.spawn(gr2),
? ? gevent.spawn(gr3),
])
輸出:

Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at 0.0 seconds
Ended Polling: at 1.0 seconds
Ended Polling: at 2.0 seconds
Ended Polling: at 3.0 seconds
同步vs異步

下面是另外一個多少有點人造色彩的例子,定義一個非確定性的(non-deterministic)?的task函數(給定相同輸入的情況下,它的輸出不保證相同)。 此例中執行這個函數的副作用就是,每次task在它的執行過程中都會隨機地停某些秒。

import gevent
import random
?
def task(pid):
? ? gevent.sleep(random.randint(0,2)*0.001)
? ? print('task {} done'.format(pid))
?
def synchronous():
? ? for i in range(5):
? ? ? ? task(i)
?
def asynchronous():
? ? gev_list = [gevent.spawn(task, i) for i in range(5)]
? ? gevent.joinall(gev_list)
?
print("synchronous:")
synchronous()
?
print("asynchronous:")
asynchronous()
運行結果:

synchronous:
task 0 done
task 1 done
task 2 done
task 3 done
task 4 done
asynchronous:
task 4 done
task 3 done
task 0 done
task 1 done
task 2 done
上例中,在同步的部分,所有的task都同步的執行, 結果當每個task在執行時主流程被阻塞(主流程的執行暫時停住)。

程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall?函數,后者阻塞當前流程,并執行所有給定的greenlet。執行流程只會在 所有greenlet執行完后才會繼續向下走。

要重點留意的是,異步的部分本質上是隨機的,而且異步部分的整體運行時間比同步 要大大減少。事實上,同步部分的最大運行時間,即是每個task停0.002秒,結果整個 隊列要停0.02秒。而異步部分的最大運行時間大致為0.002秒,因為沒有任何一個task會 阻塞其它task的執行。

?

4.3,確定性

greenlet具有確定性。在相同配置相同輸入的情況下,它們總是會產生相同的輸出。

下面是另外一個多少有點人造色彩的例子,定義一個非確定性的(non-deterministic)?的task函數(給定相同輸入的情況下,它的輸出不保證相同)。 此例中執行這個函數的副作用就是,每次task在它的執行過程中都會隨機地停某些秒。

import time
def echo(i):
? ? time.sleep(0.001)
? ? return i
# Non Deterministic Process Pool
from multiprocessing.pool import Pool
p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, range(10))]
run2 = [a for a in p.imap_unordered(echo, range(10))]
run3 = [a for a in p.imap_unordered(echo, range(10))]
run4 = [a for a in p.imap_unordered(echo, range(10))]
print(run1 == run2 == run3 == run4)
# Deterministic Gevent Pool
from gevent.pool import Pool
p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, range(10))]
run2 = [a for a in p.imap_unordered(echo, range(10))]
run3 = [a for a in p.imap_unordered(echo, range(10))]
run4 = [a for a in p.imap_unordered(echo, range(10))]
print(run1 == run2 == run3 == run4)
執行結果

False
True
即使gevent通常帶有確定性,當開始與如socket或文件等外部服務交互時, 不確定性也可能溜進你的程序中。因此盡管gevent線程是一種“確定的并發”形式, 使用它仍然可能會遇到像使用POSIX線程或進程時遇到的那些問題。

涉及并發長期存在的問題就是競爭條件(race condition)(當兩個并發線程/進程都依賴于某個共享資源同時都嘗試去修改它的時候, 就會出現競爭條件),這會導致資源修改的結果狀態依賴于時間和執行順序。 這個問題,會導致整個程序行為變得不確定。

解決辦法: 始終避免所有全局的狀態.

?

4.4,創建Greenlets

gevent對Greenlet初始化提供了一些封裝.

import gevent
from gevent import Greenlet
def foo(message, n):
? ? gevent.sleep(n)
? ? print(message)
thread1 = Greenlet.spawn(foo, "Hello", 1)
thread2 = gevent.spawn(foo, "I live!", 2)
thread3 = gevent.spawn(lambda x: (x+1), 2)
threads = [thread1, thread2, thread3]
gevent.joinall(threads)
執行結果:

Hello
I live!
除使用基本的Greenlet類之外,你也可以子類化Greenlet類,重載它的_run方法。

import gevent
from gevent import Greenlet
class MyGreenlet(Greenlet):
? ? def __init__(self, message, n):
? ? ? ? Greenlet.__init__(self)
? ? ? ? self.message = message
? ? ? ? self.n = n
? ? def _run(self):
? ? ? ? print(self.message)
? ? ? ? gevent.sleep(self.n)
g = MyGreenlet("Hi there!", 3)
g.start()
g.join()
執行結果

Hi there!
4.5,Greenlet狀態

greenlet的狀態通常是一個依賴于時間的參數:

started – Boolean, 指示此Greenlet是否已經啟動
ready() – Boolean, 指示此Greenlet是否已經停止
successful() – Boolean, 指示此Greenlet是否已經停止而且沒拋異常
value – 任意值, 此Greenlet代碼返回的值
exception – 異常, 此Greenlet內拋出的未捕獲異常
代碼示例:

import gevent
?
def win():
? ? return 'win game'
def fail():
? ? raise Exception('You failed.')
?
winner = gevent.spawn(win)
loser = gevent.spawn(fail)
print(winner.started)
print(loser.started)
# Greenlet異常會保存在Greenlet,不會上拋給主進程.
try:
? ? gevent.joinall([winner, loser])
except Exception as e:
? ? print('This will never be reached') #此處不能捕獲Greenlet異常,永遠不會觸發
?
print(loser.exception) #Greenlet異常
?
print(winner.value) ?# 'You win!'
print(loser.value) ?# None
?
print(winner.ready()) ?# True
print(loser.ready()) ?# True
print(winner.successful()) ?# True
print(loser.successful()) ?# False
執行結果

True
True
You failed.
win game
None
True
True
True
False
Traceback (most recent call last):
? File "src/gevent/greenlet.py", line 716, in gevent._greenlet.Greenlet.run
? File "coroutine.py", line 121, in fail
? ? raise Exception('You failed.')
Exception: You failed.
2019-01-22T09:05:05Z <Greenlet "Greenlet-0" at 0x103d02848: fail> failed with Exception
4.6,程序停止
當主程序(main program)收到一個SIGQUIT信號時,不能成功做yield操作的 Greenlet可能會令意外地掛起程序的執行。這導致了所謂的僵尸進程, 它需要在Python解釋器之外被kill掉。

通用的處理模式就是在主程序中監聽SIGQUIT信號,調用gevent.shutdown退出程序。

import gevent
import signal
def run_forever():
? ? gevent.sleep(1000)
? ??
if __name__ == '__main__':
? ? gevent.signal(signal.SIGQUIT, gevent.shutdown)
? ? thread = gevent.spawn(run_forever)
? ? thread.join()
4.7,超時

通過超時可以對代碼塊兒或一個Greenlet的運行時間進行約束。

import gevent
from gevent import Timeout
seconds = 3
timeout = Timeout(seconds)
timeout.start()
?
def wait():
? ? gevent.sleep(4)
?
try:
? ? gevent.spawn(wait).join()
except Timeout:
? ? print('Could not complete')
執行結果:

Could not complete
超時類

import gevent
from gevent import Timeout
?
time_to_wait = 5
?
class TimeLong(Exception):
? ? pass
?
with Timeout(time_to_wait, TimeLong):
? ? gevent.sleep(6)
4.8,猴子補丁(Monkey patching)

我們現在來到gevent的死角了. 在此之前,我已經避免提到猴子補丁(monkey patching) 以嘗試使gevent這個強大的協程模型變得生動有趣,但現在到了討論猴子補丁的黑色藝術 的時候了。你之前可能注意到我們提到了monkey.patch_socket()這個命令,這個 純粹副作用命令是用來改變標準socket庫的。

import socket
print(socket.socket)
print("After monkey patch")
from gevent import monkey
monkey.patch_socket()
print(socket.socket)
?
import select
print(select.select)
monkey.patch_select()
print("After monkey patch")
print(select.select)
執行結果:

<class 'socket.socket'>
After monkey patch
<class 'gevent._socket3.socket'>
<built-in function select>
After monkey patch
<function select at 0x1074631e0>
Python的運行環境允許我們在運行時修改大部分的對象,包括模塊,類甚至函數。 這是個一般說來令人驚奇的壞主意,因為它創造了“隱式的副作用”,如果出現問題 它很多時候是極難調試的。雖然如此,在極端情況下當一個庫需要修改Python本身 的基礎行為的時候,猴子補丁就派上用場了。在這種情況下,gevent能夠修改標準庫里面大部分的阻塞式系統調用,包括socket、ssl、threading和 select等模塊,而變為協作式運行。

例如,Redis的python綁定一般使用常規的tcp socket來與redis-server實例通信。 通過簡單地調用gevent.monkey.patch_all(),可以使得redis的綁定協作式的調度 請求,與gevent棧的其它部分一起工作。

這讓我們可以將一般不能與gevent共同工作的庫結合起來,而不用寫哪怕一行代碼。 雖然猴子補丁仍然是邪惡的(evil),但在這種情況下它是“有用的邪惡(useful evil)”。

?

參考文獻:

https://blog.csdn.net/xumesang/article/details/53288363

http://blog.chinaunix.net/uid-9162199-id-4738168.html

https://www.cnblogs.com/cwp-bg/p/9593405.html

點贊 3
————————————————
版權聲明:本文為CSDN博主「達西布魯斯」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/biheyu828/article/details/86593413

總結

以上是生活随笔為你收集整理的Python并发之协程gevent基础(5)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。