日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python 中 异步协程 的 使用方法介绍

發布時間:2024/7/23 python 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python 中 异步协程 的 使用方法介绍 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

靜覓 崔慶才的個人博客:Python中異步協程的使用方法介紹:https://cuiqingcai.com/6160.html

Python 異步 IO 、協程、asyncio、async/await、aiohttp:https://blog.csdn.net/freeking101/article/details/85286199

?

?

?

1. 前言

?

??????? 在執行一些 IO 密集型任務的時候,程序常常會因為等待 IO 而阻塞。比如在網絡爬蟲中,如果我們使用 requests 庫來進行請求的話,如果網站響應速度過慢,程序一直在等待網站響應,最后導致其爬取效率是非常非常低的。

??????? 為了解決這類問題,本文就來探討一下 Python 中異步協程來加速的方法,此種方法對于 IO 密集型任務非常有效。如將其應用到網絡爬蟲中,爬取效率甚至可以成百倍地提升。

??????? 注:本文協程使用 async/await 來實現,需要 Python 3.5 及以上版本。

?

?

2. 基本了解

?

在了解異步協程之前,我們首先得了解一些基礎概念,如 阻塞非阻塞同步異步多進程協程

?

2.1 阻塞

??????? 阻塞狀態指程序未得到所需計算資源時被掛起的狀態。程序在等待某個操作完成期間,自身無法繼續干別的事情,則稱該程序在該操作上是阻塞的。

??????? 常見的阻塞形式有:網絡 I/O 阻塞、磁盤 I/O 阻塞、用戶輸入阻塞等。阻塞是無處不在的,包括 CPU 切換上下文時,所有的進程都無法真正干事情,它們也會被阻塞。如果是多核 CPU 則正在執行上下文切換操作的核不可被利用。

?

2.2 非阻塞

程序在等待某操作過程中,自身不被阻塞,可以繼續運行干別的事情,則稱該程序在該操作上是非阻塞的。

非阻塞并不是在任何程序級別、任何情況下都可以存在的。

僅當程序封裝的級別可以囊括獨立的子程序單元時,它才可能存在非阻塞狀態。

非阻塞的存在是因為阻塞存在,正因為某個操作阻塞導致的耗時與效率低下,我們才要把它變成非阻塞的。

?

2.3 同步 (?同步?意味著?有序

不同程序單元為了完成某個任務,在執行過程中需靠某種通信方式以協調一致,稱這些程序單元是同步執行的。

例如購物系統中更新商品庫存,需要用 "鎖"?作為通信信號,讓不同的更新請求強制排隊順序執行,更新庫存的操作是同步的。

簡言之,同步意味著有序。

?

2.4 異步 (?異步 意味著 無序

為完成某個任務,不同程序單元之間過程中無需通信協調,也能完成任務的方式,不相關的程序單元之間可以是異步的。

例如:爬蟲下載網頁。調度程序調用下載程序后,即可調度其他任務,而無需與該下載任務保持通信以協調行為。不同網頁的下載、保存等操作都是無關的,也無需相互通知協調。這些異步操作的完成時刻并不確定。

簡言之,異步意味著無序。

?

2.5 多進程

多進程就是利用 CPU 的多核優勢,在同一時間并行地執行多個任務,可以大大提高執行效率。

?

2.6 協程

協程,英文叫做 Coroutine,又稱 微線程纖程,協程是一種用戶態的輕量級線程。

協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧因此協程能保留上一次調用時的狀態,即所有局部狀態的一個特定組合,每次過程重入時,就相當于進入上一次調用的狀態。

協程本質上是個單進程,協程相對于多進程來說,無需線程上下文切換的開銷,無需原子操作鎖定及同步的開銷,編程模型也非常簡單。

我們可以使用協程來實現異步操作,比如在網絡爬蟲場景下,我們發出一個請求之后,需要等待一定的時間才能得到響應,但其實在這個等待過程中,程序可以干許多其他的事情,等到響應得到之后才切換回來繼續處理,這樣可以充分利用 CPU 和其他資源,這就是異步協程的優勢。

?

?

3. 異步協程用法

?

接下來讓我們來了解下協程的實現,從 Python 3.4 開始,Python 中加入了協程的概念,但這個版本的協程還是以生成器對象為基礎的,在 Python 3.5 則增加了 async / await,使得協程的實現更加方便。

Python 中使用協程最常用的庫莫過于 asyncio,所以本文會以 asyncio 為基礎來介紹協程的使用。

首先我們需要了解下面幾個概念:

  • event_loop:事件循環相當于一個無限循環,我們可以把一些函數注冊到這個事件循環上,當滿足條件發生的時候,就會調用對應的處理方法。
  • coroutine:中文翻譯叫協程在 Python 中常指代為協程對象類型,我們可以將協程對象注冊到事件循環中,它會被事件循環調用。我們可以使用 async 關鍵字來定義一個方法,這個方法在調用時不會立即被執行,而是返回一個協程對象。
  • task:任務它是對協程對象的進一步封裝,包含了任務的各個狀態。
  • future:代表將來執行或沒有執行的任務的結果實際上和 task 沒有本質區別。

另外我們還需要了解 async / await 關鍵字,它是從 Python 3.5 才出現的,專門用于定義協程。

其中,async 定義一個協程await 用來掛起阻塞方法的執行。

?

3.1 定義協程

首先我們來定義一個協程,體驗一下它和普通進程在實現上的不同之處,代碼如下:

import asyncioasync def execute(x):print(f'Number:{x}')coroutine = execute(1) print(coroutine) print('After calling execute')loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) print('After calling loop')

運行結果:

<coroutine object execute at 0x00000247201D3740> After calling execute Number:1 After calling loop

說明:

  • 首先引入?asyncio 包,這樣才可以使用 async 和 await。然后使用 async 定義了一個 execute() 方法
  • 調用execute方法(?coroutine = execute(1) ),然而這個方法并沒有執行,而是返回了一個 coroutine 協程對象
  • 使用 get_event_loop() 方法創建了一個事件循環 loop,并調用了 loop 對象的 run_until_complete() 方法將協程注冊到事件循環 loop 中,然后啟動。最后我們才看到了 execute() 方法打印了輸出結果。

可見 async 定義的方法就會變成一個無法直接執行的 coroutine 對象,必須將其注冊到事件循環中才可以執行。

在上面還提到了 task,它是對 coroutine 對象的進一步封裝,它里面相比 coroutine 對象多了運行狀態,比如 running、finished 等,我們可以用這些狀態來獲取協程對象的執行情況。

上面的例子中,我們將 coroutine 對象傳遞給 run_until_complete() 方法的時候,實際上它進行了一個操作就是將 coroutine 封裝成了 task 對象,我們也可以顯式地進行聲明,如下所示:

import asyncioasync def execute(x):print(f'Number:{x}')return xcoroutine = execute(1) print(f'Coroutine: {coroutine}') print('After calling execute')loop = asyncio.get_event_loop() task = loop.create_task(coroutine) print(f'Task: {task}') loop.run_until_complete(task) print(f'Task: {task}') print('After calling loop')

運行結果:

Coroutine:?<coroutine?object?execute?at?0x10e0f7830> After?calling?execute Task:?<Task?pending?coro=<execute()?running?at?demo.py:4>> Number:?1 Task:?<Task?finished?coro=<execute()?done,?defined?at?demo.py:4>?result=1> After?calling?loop

? ? ? ? 這里我們定義了 loop 對象之后,接著調用了它的 create_task() 方法將 coroutine 對象轉化為了 task 對象,隨后我們打印輸出一下,發現它是 pending 狀態。接著我們將 task 對象添加到事件循環中得到執行,隨后我們再打印輸出一下 task 對象,發現它的狀態就變成了 finished,同時還可以看到其 result 變成了 1,也就是我們定義的 execute() 方法的返回結果。

? ? ? ? 另外定義 task 對象還有一種方式,就是直接通過 asyncio 的 ensure_future() 方法,返回結果也是 task 對象,這樣的話我們就可以不借助于 loop 來定義,即使我們還沒有聲明 loop 也可以提前定義好 task 對象,寫法如下:

import asyncioasync def execute(x):print(f'Number: {x}')return xcoroutine = execute(1) print(f'Coroutine: {coroutine}') print('After calling execute')task = asyncio.ensure_future(coroutine) print(f'Task: {task}') loop = asyncio.get_event_loop() loop.run_until_complete(task) print(f'Task: {task}') print('After calling loop')

運行結果:

Coroutine:?<coroutine?object?execute?at?0x10aa33830> After?calling?execute Task:?<Task?pending?coro=<execute()?running?at?demo.py:4>> Number:?1 Task:?<Task?finished?coro=<execute()?done,?defined?at?demo.py:4>?result=1> After?calling?loop

發現其效果都是一樣的。

?

3.2 綁定回調

另外我們也可以為某個 task 綁定一個回調方法,來看下面的例子:

import asyncio import requestsasync def request():url = 'https://www.baidu.com'response = requests.get(url)return response.status_codedef callback(t_task):print('status_code:', t_task.result())coroutine = request() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) print('Task:', task)loop = asyncio.get_event_loop() loop.run_until_complete(task) print('Task:', task)

說明:

定義一個 request() 方法,請求百度,返回狀態碼,同時這個方法里面我們沒有任何 print() 語句。

然后定義一個 callback() 方法,這個方法接收一個參數,是 task 對象,然后調用 print() 方法打印了 task 對象的結果。這樣我們就定義好了一個 coroutine對象一個回調方法

我們現在希望的效果是,當 coroutine 對象執行完畢之后,就去執行聲明的 callback() 方法。那么它們二者怎樣關聯起來呢?

很簡單,只需要調用 add_done_callback() 方法即可,我們將 callback() 方法傳遞給了封裝好的 task 對象,這樣當 task 執行完畢之后就可以調用 callback() 方法了,同時 task 對象還會作為參數傳遞給 callback() 方法,調用 task 對象的 result() 方法就可以獲取返回結果了。

運行結果:

Task:?<Task?pending?coro=<request()?running?at?demo.py:5>?cb=[callback()?at?demo.py:11]> status_code: 200 Task:?<Task?finished?coro=<request()?done,?defined?at?demo.py:5>?result=200>

實際上不用回調方法,直接在 task 運行完畢之后也可以直接調用 result() 方法獲取結果,如下所示:

import asyncio import requestsasync def request():url = 'https://www.baidu.com'status = requests.get(url)return statuscoroutine = request() task = asyncio.ensure_future(coroutine) print(f'Task: {task}')loop = asyncio.get_event_loop() loop.run_until_complete(task) print(f'Task: {task}') print(f'Task Result: {task.result()}')

運行結果是一樣的:

Task:?<Task?pending?coro=<request()?running?at?demo.py:4>> Task:?<Task?finished?coro=<request()?done,?defined?at?demo.py:4>?result=<Response?[200]>> Task?Result:?<Response?[200]>

?

3.3 多任務協程

上面的例子我們只執行了一次請求,如果我們想執行多次請求應該怎么辦呢?我們可以定義一個 task 列表,然后使用 asyncio 的 wait() 方法即可執行,看下面的例子:

import asyncio import requestsasync def request():url = 'https://www.baidu.com'status = requests.get(url)return statustasks = [asyncio.ensure_future(request()) for _ in range(5)]# print(f'Tasks: {tasks}') list(map(lambda x: print(x), tasks))loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))for task in tasks:print('Task Result:', task.result())

這里我們使用一個 for 循環創建了五個 task,組成了一個列表,然后把這個列表首先傳遞給了 asyncio 的 wait() 方法,然后再將其注冊到時間循環中,就可以發起五個任務了。最后我們再將任務的運行結果輸出出來,運行結果如下:

<Task pending name='Task-1' coro=<request() running at demo.py:5>> <Task pending name='Task-2' coro=<request() running at demo.py:5>> <Task pending name='Task-3' coro=<request() running at demo.py:5>> <Task pending name='Task-4' coro=<request() running at demo.py:5>> <Task pending name='Task-5' coro=<request() running at demo.py:5>> Task Result: <Response [200]> Task Result: <Response [200]> Task Result: <Response [200]> Task Result: <Response [200]> Task Result: <Response [200]>

可以看到五個任務被順次執行了,并得到了運行結果。

?

3.4 協程實現

? ? ? ? 前面說了這么一通,又是 async,又是 coroutine,又是 task,又是 callback,但似乎并沒有看出協程的優勢啊?反而寫法上更加奇怪和麻煩了,別急,上面的案例只是鋪墊,接下來我們正式來看下協程在解決 IO 密集型任務上有怎樣的優勢吧!

? ? ? ? 上面的代碼中,我們用一個網絡請求作為示例,這就是一個耗時等待的操作,因為我們請求網頁之后需要等待頁面響應并返回結果。耗時等待的操作一般都是 IO 操作,比如文件讀取、網絡請求等等。協程對于處理這種操作是有很大優勢的,當遇到需要等待的情況的時候,程序可以暫時掛起,轉而去執行其他的操作,從而避免一直等待一個程序而耗費過多的時間,充分利用資源。

? ? ? ? 為了表現出協程的優勢,我們需要先創建一個合適的實驗環境,最好的方法就是模擬一個需要等待一定時間才可以獲取返回結果的網頁,上面的代碼中使用了百度,但百度的響應太快了,而且響應速度也會受本機網速影響,所以最好的方式是自己在本地模擬一個慢速服務器,這里我們選用 Flask。

如果沒有安裝 Flask 的話可以執行如下命令安裝:

pip3 install flask

然后編寫服務器代碼如下:

from flask import Flask import timeapp = Flask(__name__)@app.route('/') def index():time.sleep(3)return 'Hello!'if __name__ == '__main__':app.run(threaded=True)

這里我們定義了一個 Flask 服務,主入口是 index() 方法,方法里面先調用了 sleep() 方法休眠 3 秒,然后接著再返回結果,也就是說,每次請求這個接口至少要耗時 3 秒,這樣我們就模擬了一個慢速的服務接口。

注意這里服務啟動的時候,run() 方法加了一個參數 threaded,這表明 Flask 啟動了多線程模式,不然默認是只有一個線程的。如果不開啟多線程模式,同一時刻遇到多個請求的時候,只能順次處理,這樣即使我們使用協程異步請求了這個服務,也只能一個一個排隊等待,瓶頸就會出現在服務端。所以,多線程模式是有必要打開的。

啟動之后,Flask 應該默認會在 127.0.0.1:5000 上運行,運行之后控制臺輸出結果如下:

* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

接下來我們再重新使用上面的方法請求一遍:

import asyncio import requests import timestart = time.time()async def request():url = 'http://127.0.0.1:5000'print(f'Waiting for {url}')response = requests.get(url)print(f'Get response from {url}, Result: {response.text}')tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))end = time.time() print(f'Cost time: {end - start}')

在這里我們還是創建了五個 task,然后將 task 列表傳給 wait() 方法并注冊到時間循環中執行。

運行結果如下:

Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000, Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000, Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000, Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000, Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000, Result: Hello! Cost time: 15.090105056762695

可以發現和正常的請求并沒有什么兩樣,依然還是順次執行的,耗時 15 秒,平均一個請求耗時 3 秒,說好的異步處理呢?

其實,要實現異步處理,我們得先要有掛起的操作,當一個任務需要等待 IO 結果的時候,可以掛起當前任務,轉而去執行其他任務,這樣我們才能充分利用好資源,上面方法都是一本正經的串行走下來,連個掛起都沒有,怎么可能實現異步?想太多了。

要實現異步,接下來我們再了解一下 await 的用法,使用 await 可以將耗時等待的操作掛起,讓出控制權。當協程執行的時候遇到 await,事件循環?就會將 本協程掛起,轉而去執行別的協程,直到其他的協程掛起或執行完畢。

所以,我們可能會將代碼中的 request() 方法改成如下的樣子:

async def request():url = 'http://127.0.0.1:5000'print('Waiting for', url)response = await requests.get(url)print('Get response from', url, 'Result:', response.text)

僅僅是在 requests 前面加了一個 await,然而執行以下代碼,會得到如下報錯:

Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Cost?time:?15.048935890197754 Task?exception?was?never?retrieved future:?<Task?finished?coro=<request()?done,?defined?at?demo.py:7>?exception=TypeError("object?Response?can't? be?used?in?'await'?expression",)> Traceback?(most?recent?call?last):File?"demo.py",?line?10,?in?requeststatus?=?await?requests.get(url) TypeError:?object?Response?can't?be?used?in?'await'?expression

這次它遇到 await 方法確實掛起了,也等待了,但是最后卻報了這么個錯,這個錯誤的意思是 requests 返回的 Response 對象不能和 await 一起使用,為什么呢?因為根據官方文檔說明,await 后面的對象必須是如下格式之一:

  • A native coroutine object returned from a native coroutine function,一個原生 coroutine 對象
  • A generator-based coroutine object returned from a function decorated with types.coroutine(),一個由 types.coroutine() 修飾的生成器,這個生成器可以返回 coroutine 對象。
  • An object with an await__ method returning an iterator,一個包含 __await 方法的對象返回的一個迭代器。

可以參見:https://www.python.org/dev/peps/pep-0492/#await-expression。

reqeusts 返回的 Response 不符合上面任一條件,因此就會報上面的錯誤了。

那么有的小伙伴就發現了,既然 await 后面可以跟一個 coroutine 對象,那么我用 async 把請求的方法改成 coroutine 對象不就可以了嗎?所以就改寫成如下的樣子:

import asyncio import requests import timestart = time.time()async def get(url):return requests.get(url)async def request():url = 'http://127.0.0.1:5000'print('Waiting for', url)response = await get(url)print('Get response from', url, 'Result:', response.text)tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))end = time.time() print('Cost time:', end - start)

這里我們將請求頁面的方法獨立出來,并用 async 修飾,這樣就得到了一個 coroutine 對象,我們運行一下看看:

Waiting?for?http://127.0.0.1:5000 Get?response?from?http://127.0.0.1:5000?Result:?Hello! Waiting?for?http://127.0.0.1:5000 Get?response?from?http://127.0.0.1:5000?Result:?Hello! Waiting?for?http://127.0.0.1:5000 Get?response?from?http://127.0.0.1:5000?Result:?Hello! Waiting?for?http://127.0.0.1:5000 Get?response?from?http://127.0.0.1:5000?Result:?Hello! Waiting?for?http://127.0.0.1:5000 Get?response?from?http://127.0.0.1:5000?Result:?Hello! Cost?time:?15.134317874908447

還是不行,它還不是異步執行,也就是說我們僅僅將涉及 IO 操作的代碼封裝到 async 修飾的方法里面是不可行的!我們必須要使用支持異步操作的請求方式才可以實現真正的異步,所以這里就需要 aiohttp 派上用場了。

?

3.5 使用 aiohttp

aiohttp 是一個支持異步請求的庫,利用它和 asyncio 配合我們可以非常方便地實現異步請求操作。

安裝方式如下:

pip3?install?aiohttp

官方文檔鏈接為:https://aiohttp.readthedocs.io/,它分為兩部分,一部分是 Client,一部分是 Server,詳細的內容可以參考官方文檔。

下面我們將 aiohttp 用上來,將代碼改成如下樣子:

import asyncio import aiohttp import timestart = time.time()async def get(url):session = aiohttp.ClientSession()response = await session.get(url)result = await response.text()session.close()return resultasync def request():url = 'http://127.0.0.1:5000'print('Waiting for', url)# #############################################'''注意 加 await 和 不加 await 區別,1. 加 await 時,可以掛起當前函數,讓出控制權2. 不加 await 時,不會掛起當前函數,即函數順序執行完返回。可以 對比輸出結果理解'''# result = await get(url)result = get(url)# #############################################print('Get response from', url, 'Result:', result)tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))end = time.time() print('Cost time:', end - start)

在這里我們將請求庫由 requests 改成了 aiohttp,通過 aiohttp 的 ClientSession 類的 get() 方法進行請求,結果如下:

Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Get?response?from?http://127.0.0.1:5000?Result:?Hello! Get?response?from?http://127.0.0.1:5000?Result:?Hello! Get?response?from?http://127.0.0.1:5000?Result:?Hello! Get?response?from?http://127.0.0.1:5000?Result:?Hello! Get?response?from?http://127.0.0.1:5000?Result:?Hello! Cost?time:?3.0199508666992188

成功了!我們發現這次請求的耗時由 15 秒變成了 3 秒,耗時直接變成了原來的 1/5。

代碼里面我們使用了 await,后面跟了 get() 方法,在執行這五個協程的時候,如果遇到了 await,那么就會將當前協程掛起,轉而去執行其他的協程,直到其他的協程也掛起或執行完畢,再進行下一個協程的執行。

開始運行時,事件循環會運行第一個 task,針對第一個 task 來說,當執行到第一個 await 跟著的 get() 方法時,它被掛起,但這個 get() 方法第一步的執行是非阻塞的,掛起之后立馬被喚醒,所以立即又進入執行,創建了 ClientSession 對象,接著遇到了第二個 await,調用了 session.get() 請求方法,然后就被掛起了,由于請求需要耗時很久,所以一直沒有被喚醒,好第一個 task 被掛起了,那接下來該怎么辦呢?事件循環會尋找當前未被掛起的協程繼續執行,于是就轉而執行第二個 task 了,也是一樣的流程操作,直到執行了第五個 task 的 session.get() 方法之后,全部的 task 都被掛起了。所有 task 都已經處于掛起狀態,那咋辦?只好等待了。3 秒之后,幾個請求幾乎同時都有了響應,然后幾個 task 也被喚醒接著執行,輸出請求結果,最后耗時,3 秒!

怎么樣?這就是異步操作的便捷之處,當遇到阻塞式操作時,任務被掛起,程序接著去執行其他的任務,而不是傻傻地等著,這樣可以充分利用 CPU 時間,而不必把時間浪費在等待 IO 上。

有人就會說了,既然這樣的話,在上面的例子中,在發出網絡請求后,既然接下來的 3 秒都是在等待的,在 3 秒之內,CPU 可以處理的 task 數量遠不止這些,那么豈不是我們放 10 個、20 個、50 個、100 個、1000 個 task 一起執行,最后得到所有結果的耗時不都是 3 秒左右嗎?因為這幾個任務被掛起后都是一起等待的。

理論來說確實是這樣的,不過有個前提,那就是服務器在同一時刻接受無限次請求都能保證正常返回結果,也就是服務器無限抗壓,另外還要忽略 IO 傳輸時延,確實可以做到無限 task 一起執行且在預想時間內得到結果。

我們這里將 task 數量設置成 100,再試一下:

tasks?=?[asyncio.ensure_future(request())?for?_?in?range(100)]

耗時結果如下:

Cost?time:?3.106252670288086

最后運行時間也是在 3 秒左右,當然多出來的時間就是 IO 時延了。

可見,使用了異步協程之后,我們幾乎可以在相同的時間內實現成百上千倍次的網絡請求,把這個運用在爬蟲中,速度提升可謂是非常可觀了。

關于 await 補充說明:假設有兩個異步函數 async a,async b,a 中的某一步有 await,當程序碰到關鍵字 await b() 后,異步程序掛起后去執行另一個異步b程序,就是從函數內部跳出去執行其他函數,當掛起條件消失后,不管b是否執行完,要馬上從b程序中跳出來,回到原程序執行原來的操作。如果 await 后面跟的 b 函數不是異步函數,那么操作就只能等 b 執行完再返回,無法在 b 執行的過程中返回。如果要在 b 執行完才返回,也就不需要用 await 關鍵字了,直接調用 b 函數就行。所以這就需要 await 后面跟的是 異步函數了。在一個異步函數中,可以不止一次掛起,也就是可以用多個 await 。

?

示例 2:

import asyncio import aiohttptemplate = 'http://exercise.kingname.info/exercise_middleware_ip/{page}'async def get(session, queue):while True:try:page = queue.get_nowait()except asyncio.QueueEmpty:returnurl = template.format(page=page)resp = await session.get(url)print(await resp.text(encoding='utf-8'))async def main():async with aiohttp.ClientSession() as session:queue = asyncio.Queue()for page in range(1000):queue.put_nowait(page)tasks = []for _ in range(1000):task = get(session, queue)tasks.append(task)await asyncio.wait(tasks)loop = asyncio.get_event_loop() loop.run_until_complete(main())

讓這個爬蟲爬1000頁的內容,我們來看看下面這個視頻。

可以看到,目前這個速度已經可以跟 Scrapy 比一比了。并且大家需要知道,這個爬蟲只有1個進程1個線程,它是通過異步的方式達到這個速度的。為什么速度能快那么多呢?

關鍵的代碼,就在:

tasks = [] for _ in range(100):task = get(session, queue)tasks.append(task) await asyncio.wait(tasks)

asyncio.wait?會在所有協程全部結束的時候才返回。

但是我們把1000個 URL 放在asyncio.Queue生成的一個異步隊列里面,每一個協程都通過 while True 不停從這個異步隊列里面取 URL 并進行訪問,直到異步隊列為空,退出。程序運行時,Python 會自動調度這100個協程,當一個協程在等待網絡 IO 返回時,切換到第二個協程并發起請求,在這個協程等待返回時,繼續切換到第三個協程并發起請求……。程序充分利用了網絡 IO 的等待時間,從而大大提高了運行速度。

?

?

3.6 與單進程、多進程對比

可能有的小伙伴非常想知道上面的例子中,如果 100 次請求,不是用異步協程的話,使用單進程和多進程會耗費多少時間,我們來測試一下:

首先來測試一下單進程的時間:

import requests import timestart = time.time()def request():url = 'http://127.0.0.1:5000'print('Waiting for', url)result = requests.get(url).textprint('Get response from', url, 'Result:', result)for _ in range(100):request()end = time.time() print('Cost time:', end - start)

最后耗時:

Cost?time:?305.16639709472656

接下來我們使用多進程來測試下,使用 multiprocessing 庫:

import requests import time import multiprocessingstart = time.time()def request(_):url = 'http://127.0.0.1:5000'print('Waiting for', url)result = requests.get(url).textprint('Get response from', url, 'Result:', result)cpu_count = multiprocessing.cpu_count() print('Cpu count:', cpu_count) pool = multiprocessing.Pool(cpu_count) pool.map(request, range(100))end = time.time() print('Cost time:', end - start)

這里我使用了 multiprocessing 里面的 Pool 類,即進程池。我的電腦的 CPU 個數是 8 個,這里的進程池的大小就是 8。

運行時間:

Cost?time:?48.17306900024414

可見 multiprocessing 相比單線程來說,還是可以大大提高效率的。

?

3.7 與多進程的結合

既然異步協程和多進程對網絡請求都有提升,那么為什么不把二者結合起來呢?在最新的 PyCon 2018 上,來自 Facebook 的 John Reese 介紹了 asyncio 和 multiprocessing 各自的特點,并開發了一個新的庫,叫做 aiomultiprocess,感興趣的可以了解下:https://www.youtube.com/watch?v=0kXaLh8Fz3k。

這個庫的安裝方式是:

pip3?install?aiomultiprocess

需要 Python 3.6 及更高版本才可使用。

使用這個庫,我們可以將上面的例子改寫如下:

import asyncio import aiohttp import time from aiomultiprocess import Poolstart = time.time()async def get(url):session = aiohttp.ClientSession()response = await session.get(url)result = await response.text()session.close()return resultasync def request():url = 'http://127.0.0.1:5000'urls = [url for _ in range(100)]async with Pool() as pool:result = await pool.map(get, urls)return resultcoroutine = request() task = asyncio.ensure_future(coroutine) loop = asyncio.get_event_loop() loop.run_until_complete(task)end = time.time() print('Cost time:', end - start)

這樣就會同時使用多進程和異步協程進行請求,當然最后的結果其實和異步是差不多的:

Cost?time:?3.1156570434570312

因為我的測試接口的原因,最快的響應也是 3 秒,所以這部分多余的時間基本都是 IO 傳輸時延。但在真實情況下,我們在做爬取的時候遇到的情況千變萬化,一方面我們使用異步協程來防止阻塞,另一方面我們使用 multiprocessing 來利用多核成倍加速,節省時間其實還是非常可觀的。

以上便是 Python 中協程的基本用法,希望對大家有幫助。

?

?

4. 參考來源

  • http://python.jobbole.com/87310/
  • https://www.cnblogs.com/xybaby/p/6406191.html
  • http://python.jobbole.com/88291/
  • http://lotabout.me/2017/understand-python-asyncio/
  • https://segmentfault.com/a/1190000008814676
  • https://www.cnblogs.com/animalize/p/4738941.html

轉載請注明:靜覓???Python中異步協程的使用方法介紹

?

?

?

總結

以上是生活随笔為你收集整理的Python 中 异步协程 的 使用方法介绍的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。