日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【进阶】 --- 多线程、多进程、异步IO实用例子

發布時間:2024/7/23 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【进阶】 --- 多线程、多进程、异步IO实用例子 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

【進階】 --- 多線程、多進程、異步IO實用例子:https://blog.csdn.net/lu8000/article/details/82315576

python之爬蟲_并發(串行、多線程、多進程、異步IO):https://www.cnblogs.com/fat39/archive/2004/01/13/9044474.html

Python 并發總結,多線程,多進程,異步IO:https://www.cnblogs.com/junmoxiao/p/11948993.html

asyncio --- 異步 I/O 官方文檔:https://docs.python.org/zh-cn/3.10/library/asyncio.html
關于asyncio異步io并發編程:https://zhuanlan.zhihu.com/p/158641367

支持?asyncio 的異步Python庫:https://github.com/aio-libs

知乎專欄:?Python爬蟲深入詳解 之??Python中協程異步IO(asyncio)詳解(?https://zhuanlan.zhihu.com/p/59621713

asyncio:異步I/O、事件循環和并發工具:https://www.cnblogs.com/sidianok/p/12210857.html

在編寫爬蟲時,性能的消耗主要在IO請求中,當單進程單線程模式下請求URL時必然會引起等待,從而使得請求整體變慢。以下代碼默認運行環境為 python3。

  • httpie:HTTPie使用詳解:https://zhuanlan.zhihu.com/p/45093545
  • grequests,Requests + Gevent,訪問:https://github.com/kennethreitz/grequests
  • gevent,一個高并發的網絡性能庫,訪問:http://www.gevent.org/
  • twisted,基于事件驅動的網絡引擎框架。訪問:https://twistedmatrix.com/trac/

目錄

一、多線程、多進程
? ? ? ? 1.同步執行
? ? ? ? 2.多線程執行
? ? ? ? 3.多線程+回調函數執行
? ? ? ? 4.多進程執行
? ? ? ? 5.多進程+回調函數執行

二、異步?
????????1.asyncio 示例 1
??????????asyncio 示例 2?
??????????python異步編程之asyncio(百萬并發)
? ? ? ? ? 學習 python 高并發模塊 asynio
????????2.asyncio + aiohttp
????????3.asyncio + requests?
????????4.gevent + requests
????????5.grequests
????????6.Twisted示例
????????7.Tornado
????????8.Twisted更多
????????9.史上最牛逼的異步 IO 模塊

一、多線程、多進程

1. 同步執行

示例 1(?同步執行

import requestsdef fetch_sync(r_url=None):response = requests.get(r_url, verify=False)return responseif __name__ == '__main__':url_list = ['https://www.github.com', 'https://www.bing.com']for url in url_list:fetch_sync(url)

示例 2(?同步執行

import requests import time from lxml import etreeurls = ['https://blog.csdn.net/Jmilk/article/details/103218919','https://blog.csdn.net/stven_king/article/details/103256724','https://blog.csdn.net/csdnnews/article/details/103154693','https://blog.csdn.net/dg_lee/article/details/103951021','https://blog.csdn.net/m0_37907797/article/details/103272967','https://blog.csdn.net/zzq900503/article/details/49618605','https://blog.csdn.net/weixin_44339238/article/details/103977138','https://blog.csdn.net/dengjin20104042056/article/details/103930275','https://blog.csdn.net/Mind_programmonkey/article/details/103940511','https://blog.csdn.net/xufive/article/details/102993570','https://blog.csdn.net/weixin_41010294/article/details/104009722','https://blog.csdn.net/yunqiinsight/article/details/103137022','https://blog.csdn.net/qq_44210563/article/details/102826406', ]def get_title(url: str):headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ''(KHTML, like Gecko) Chrome/86.0.4240.183 Safari/537.36'}r = requests.get(url, headers=headers)if 200 == r.status_code:title = etree.HTML(r.content).xpath('//h1[@class="title-article"]/text()')[0]print(title)else:print(f'[status_code:{r.status_code}]:{r.url}')def main():for url in urls:get_title(url)if __name__ == '__main__':start = time.time()main()print(f'cost time: {time.time() - start}s')

使用 httpx 模塊的同步調用( httpx 即可同步,也可異步

import time import httpxdef make_request(client):resp = client.get('http://httpbin.org/get')result = resp.json()print(f'status_code : {resp.status_code}')assert 200 == resp.status_codedef main():session = httpx.Client()# 100 次調用for _ in range(100):make_request(session)if __name__ == '__main__':# 開始start = time.time()main()# 結束end = time.time()print(f'同步:發送100次請求,耗時:{end - start}')

2. 多線程執行(線程池)

from concurrent.futures import ThreadPoolExecutor import requestsdef fetch_sync(r_url):response = requests.get(r_url)return responseurl_list = ['https://www.github.com', 'https://www.bing.com'] pool = ThreadPoolExecutor(5) for url in url_list:pool.submit(fetch_sync, url) pool.shutdown(wait=True)

3. 多線程 + 回調函數執行

from concurrent.futures import ThreadPoolExecutor import requestsdef fetch_sync(r_url):response = requests.get(r_url, verify=False)return responsedef callback(future):print(future.result())url_list = ['https://www.github.com', 'https://www.bing.com'] pool = ThreadPoolExecutor(5) for url in url_list:v = pool.submit(fetch_sync, url)v.add_done_callback(callback) pool.shutdown(wait=True)

4. 多進程執行

import requests from concurrent import futuresdef fetch_sync(r_url):response = requests.get(r_url)return responseif __name__ == '__main__':url_list = ['https://www.github.com', 'https://www.bing.com']with futures.ProcessPoolExecutor(5) as executor:res = [executor.submit(fetch_sync, url) for url in url_list]print(res)

示例 :

import requests from concurrent import futures import time import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)def fetch_sync(args):i, r_url = argsprint(f'index : {i}')response = requests.get(r_url, verify=False)time.sleep(2)return response.status_codedef callback(future):print(future.result())if __name__ == '__main__':# url_list = ['https://www.github.com', 'https://www.bing.com']url = 'https://www.github.com'with futures.ProcessPoolExecutor(5) as executor:for index in range(1000):v = executor.submit(fetch_sync, (index, url))v.add_done_callback(callback)pass

5. 多進程 + 回調函數執行

import requests from concurrent import futuresdef fetch_sync(r_url):response = requests.get(r_url, verify=False)return responsedef callback(future):print(future.result())if __name__ == '__main__':url_list = ['https://www.github.com', 'https://www.bing.com']with futures.ProcessPoolExecutor(5) as executor:for url in url_list:v = executor.submit(fetch_sync, url)v.add_done_callback(callback)pass

二、異步?

對于 事件循環可以動態的增加協程?到?事件循環 中,?而不是在一開始就確定所有需要協程
協程 只運行在 事件循環 中。
默認情況下 asyncio.get_event_loop() 是一個 select模型事件循環
默認的?asyncio.get_event_loop() 事件循環屬于主線程。

參考? python asyncio 協程:https://blog.csdn.net/dashoumeixi/article/details/81001681

一般是一個線程一個事件循環,為什么要一個線程一個事件循環?

[ 這個東西不去了解完全不會影響 asyncio 包的學習,如果不懂的話記住一個線程一個事件循環就得了]

如果你要使用多個事件循環 ,創建線程后調用

lp = asyncio.new_event_loop() #創建一個新的事件循環 asyncio.set_event_loop(lp) #設置當前線程的事件循環

核心思想: yield from / await 就這2個關鍵字,運行(驅動)一個協程,?同時交出當前函數的控制權,讓事件循環執行下個任務。

yield from 的實現原理:yield from實現:https://blog.csdn.net/dashoumeixi/article/details/84076812

要搞懂 asyncio 協程,還是先把生成器弄懂,如果對生成器很模糊,比如 yield from 生成器對象,這個看不懂的話,建議先看 :python生成器 yield from:https://blog.csdn.net/dashoumeixi/article/details/80936798

有 2 種方式讓協程運行起來,協程(生成器)本身是不運行的

  • 1. await / yield from??協程,這一組能等待協程完成。
  • 2. asyncio.ensure_future / async(協程)這一組不需要等待協程完成。

注意:

  • 1. 協程就是生成器的增強版 ( 多了send 與 yield 的接收 ),在 asyncio 中的協程 與 生成器對象不同點:?
    ? ? ? ? ?asyncio協程:?函數內部不能使用 yield [如果使用會拋RuntimeError],只能使用 yield from / await,?
    ? ? ? ? 一般的生成器: yield 或 yield from 2個都能用,至少使用一個。這 2個本來就是一回事,?協程只是不能使用 yield? ?
  • 2. 在 asycio 中所有的協程都被自動包裝成一個個 Task / Future 對象,但其本質還是一個生成器,因此可以 yield from / await???Task/Futrue

基本流程:

  • 1. 定義一個協程 (async def 或者 @asyncio.coroutine 裝飾的函數)
  • 2. 調用上述函數,獲取一個協程對象 【不能使用yield,除非你自己寫異步模塊,畢竟最終所調用的還是基于yield的生成器函數】。通過 asyncio.ensure_futureasyncio.async 函數調度協程(這部意味著要開始執行了) ,返回了一個 Task 對象,Task對象 是 Future對象 的 子類,?( 這步可作可不作,只要是一個協程對象,一旦扔進事件隊列中,將自動給你封裝成Task對象 )
  • 3. 獲取一個事件循環???asyncio.get_event_loop() ,默認此事件循環屬于主線程
  • 4. 等待事件循環調度協程

后面的例子著重說明了一下 as_completed,附加了源碼。? 先說明一下:

  • ????1. as_completed 每次迭代返回一個協程,
  • ????2. 這個協程內部從 Queue 中取出先完成的 Future 對象
  • ????3. 然后我們再 await coroutine

示例 1:

import asyncio"""第一個例子沒什么用.注意: 協程 與 生成器 的用法是一樣的. 需要調用之后才產生對象. """async def func():print('hi')lp = asyncio.get_event_loop() # 獲取事件循環# 放到進事件循環里.注意,func() 而不是func. 需要調用之后才是協程對象. lp.run_until_complete(func())

示例 2:

import asyncio"""用async def (新語法) 定義一個函數,同時返回值asyncio.sleep 模擬IO阻塞情況 ; await 相當于 yield from.await 或者 yield from 交出函數控制權(中斷),讓事件循環執行下個任務 ,一邊等待后面的協程完成 """async def func(i):print('start')await asyncio.sleep(i) # 交出控制權print('done')return ico = func(2) # 產生協程對象 print(co) lp = asyncio.get_event_loop() # 獲取事件循環 task = asyncio.ensure_future(co) # 開始調度 lp.run_until_complete(task) # 等待完成 print(task.result()) # 獲取結果

添加回調

示例 1:

import asyncio"""添加一個回調:add_done_callback """async def func(i):print('start')await asyncio.sleep(i)return idef call_back(v):print('callback , arg:', v, 'result:', v.result())if __name__ == '__main__':co = func(2) # 產生協程對象lp = asyncio.get_event_loop() # 獲取事件循環# task = asyncio.run_coroutine_threadsafe(co) # 開始調度task = asyncio.ensure_future(co) # 開始調度task.add_done_callback(call_back) # 增加回調lp.run_until_complete(task) # 等待print(task.result()) # 獲取結果

子協程調用原理圖

官方的一個實例如下

從下面的原理圖我們可以看到

  • 1 當 事件循環 處于運行狀態的時候,任務Task 處于pending(等待),會把控制權交給委托生成器 print_sum
  • 2??委托生成器 print_sum 會建立一個雙向通道為Task和子生成器,調用子生成器compute并把值傳遞過去
  • 3??子生成器compute會通過委托生成器建立的雙向通道把自己當前的狀態suspending(暫停)傳給Task,Task 告訴 loop 它數據還沒處理完成
  • 4??loop 會循環檢測 Task ,Task 通過雙向通道去看自生成器是否處理完成
  • 5 子生成器處理完成后會向委托生成器拋出一個異常和計算的值,并關閉生成器
  • 6 委托生成器再把異常拋給任務(Task),把任務關閉
  • 7??loop 停止循環

call_soon、call_at、call_later、call_soon_threadsafe

  • call_soon? ? 循環開始檢測時,立即執行一個回調函數
  • call_at? ? 循環開始的第幾秒s執行
  • call_later? ? 循環開始后10s后執行
  • call_soom_threadsafe? ? 立即執行一個安全的線程
import asyncioimport timedef call_back(str_var, loop):print("success time {}".format(str_var))def stop_loop(str_var, loop):time.sleep(str_var)loop.stop()# call_later, call_at if __name__ == "__main__":event_loop = asyncio.get_event_loop()event_loop.call_soon(call_back, 'loop 循環開始檢測立即執行', event_loop)now = event_loop.time() # loop 循環時間event_loop.call_at(now + 2, call_back, 2, event_loop)event_loop.call_at(now + 1, call_back, 1, event_loop)event_loop.call_at(now + 3, call_back, 3, event_loop)event_loop.call_later(6, call_back, "6s后執行", event_loop)# event_loop.call_soon_threadsafe(stop_loop, event_loop)event_loop.run_forever()

不同線程中的事件循環

事件循環中維護了一個隊列(FIFO, Queue) ,通過另一種方式來調用:

import time import datetime import asyncio"""事件循環中維護了一個FIFO隊列通過call_soon 通知事件循環來調度一個函數. """def func(x):print(f'x:{x}, start time:{datetime.datetime.now().replace(microsecond=0)}')time.sleep(x)print(f'func invoked:{x}')loop = asyncio.get_event_loop() loop.call_soon(func, 1) # 調度一個函數 loop.call_soon(func, 2) loop.call_soon(func, 3) loop.run_forever() # 阻塞''' x:1, start time:2020-10-01 15:45:46 func invoked:1 x:2, start time:2020-10-01 15:45:47 func invoked:2 x:3, start time:2020-10-01 15:45:49 func invoked:3 '''

可以看到以上操作是同步的。下面通過?asyncio.run_coroutine_threadsafe?函數可以把上述函數調度變成異步執行:

import time import datetime import asyncio"""1.首先會調用asyncio.run_coroutine_threadsafe 這個函數.2.之前的普通函數修改成協程對象 """async def func(x):print(f'x:{x}, start time:{datetime.datetime.now().replace(microsecond=0)}')await asyncio.sleep(x)print(f'func invoked:{x}, now:{datetime.datetime.now().replace(microsecond=0)}')loop = asyncio.get_event_loop() co1 = func(1) co2 = func(2) co3 = func(3) asyncio.run_coroutine_threadsafe(co1, loop) # 調度 asyncio.run_coroutine_threadsafe(co2, loop) asyncio.run_coroutine_threadsafe(co3, loop) loop.run_forever() # 阻塞''' x:1, start time:2020-10-01 15:49:32 x:2, start time:2020-10-01 15:49:32 x:3, start time:2020-10-01 15:49:32 func invoked:1, now:2020-10-01 15:49:33 func invoked:2, now:2020-10-01 15:49:34 func invoked:3, now:2020-10-01 15:49:35 '''

上面 2 個例子只是告訴你 2 件事情。

  • 1. run_coroutine_threadsafe異步線程安全?,call_soon同步
  • 2. run_coroutine_threadsafe 這個函數 對應 ensure_future (只能作用于同一線程中)

可以在一個子線程中運行一個事件循環,然后在主線程中動態的添加協程,這樣既不阻塞主線程執行其他任務,子線程也可以異步的執行協程。

注意:默認情況下獲取的 event_loop 是主線程的,所以要在子線程中使用 event_loop 需要 new_event_loop 。如果在子線程中直接獲取 event_loop 會拋異常 。

源代碼中的判斷:isinstance(threading.current_thread(), threading._MainThread)

示例:

import os import sys import queue import threading import time import datetime import asyncio"""1. call_soon , call_soon_threadsafe 是同步的2. asyncio.run_coroutine_threadsafe(coro, loop) -> 對應 asyncio.ensure_future是在 事件循環中 異步執行。 """# 在子線程中執行一個事件循環 , 注意需要一個新的事件循環 def thread_loop(loop: asyncio.AbstractEventLoop):print('線程開啟 tid:', threading.currentThread().ident)asyncio.set_event_loop(loop) # 設置一個新的事件循環loop.run_forever() # run_forever 是阻塞函數,所以,子線程不會退出。async def func(x, q):current_time = datetime.datetime.now().replace(microsecond=0)msg = f'func: {x}, time:{current_time}, tid:{threading.currentThread().ident}'print(msg)await asyncio.sleep(x)q.put(x)if __name__ == '__main__':temp_queue = queue.Queue()lp = asyncio.new_event_loop() # 新建一個事件循環, 如果使用默認的, 則不能放入子線程thread_1 = threading.Thread(target=thread_loop, args=(lp,))thread_1.start()co1 = func(2, temp_queue) # 2個協程co2 = func(3, temp_queue)asyncio.run_coroutine_threadsafe(co1, lp) # 開始調度在子線程中的事件循環asyncio.run_coroutine_threadsafe(co2, lp)print(f'開始事件:{datetime.datetime.now().replace(microsecond=0)}')while 1:if temp_queue.empty():print('隊列為空,睡1秒繼續...')time.sleep(1)continuex = temp_queue.get() # 如果為空,get函數會直接阻塞,不往下執行current_time = datetime.datetime.now().replace(microsecond=0)msg = f'main :{x}, time:{current_time}'print(msg)time.sleep(1)

下面例子中 asyncio.ensure_future/async 都可以換成 asyncio.run_coroutine_threadsafe 【?在不同線程中的事件循環 】:

ThreadPollExecutor 和 asyncio 完成阻塞 IO 請求

在 asyncio 中集成線程池處理耗時IO

在協程中同步阻塞的寫法,但有些時候不得已就是一些同步耗時的接口

可以把 線程池 集成到 asynico 模塊中

import asyncio from concurrent import futurestask_list = [] loop = asyncio.get_event_loop() executor = futures.ThreadPoolExecutor(3)def get_url(t_url=None):print(t_url)for url in range(20):url = "http://shop.projectsedu.com/goods/{}/".format(url)task = loop.run_in_executor(executor, get_url, url)task_list.append(task)loop.run_until_complete(asyncio.wait(task_list))

示例代碼:

# 使用多線程:在 協程 中集成阻塞io import asyncio from concurrent.futures import ThreadPoolExecutor import socket from urllib.parse import urlparsedef get_url(url):# 通過socket請求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = "/"# 建立socket連接client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# client.setblocking(False)client.connect((host, 80)) # 阻塞不會消耗cpu# 不停的詢問連接是否建立好, 需要while循環不停的去檢查狀態# 做計算任務或者再次發起其他的連接請求client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))data = b""while True:d = client.recv(1024)if d:data += delse:breakdata = data.decode("utf8")html_data = data.split("\r\n\r\n")[1]print(html_data)client.close()if __name__ == "__main__":import timestart_time = time.time()loop = asyncio.get_event_loop()executor = ThreadPoolExecutor(3)tasks = []for url in range(20):url = "http://shop.projectsedu.com/goods/{}/".format(url)task = loop.run_in_executor(executor, get_url, url)tasks.append(task)loop.run_until_complete(asyncio.wait(tasks))print("last time:{}".format(time.time() - start_time))

不用集成也是可以的,但是要在函數的前面加上 async 使同步變成異步寫法

#使用多線程:在攜程中集成阻塞io import asyncio from concurrent.futures import ThreadPoolExecutor import socket from urllib.parse import urlparse import timeasync def get_html(url):#通過socket請求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = "/"#建立socket連接client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# client.setblocking(False)client.connect((host, 80)) #阻塞不會消耗cpu#不停的詢問連接是否建立好, 需要while循環不停的去檢查狀態#做計算任務或者再次發起其他的連接請求client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))data = b""while True:d = client.recv(1024)if d:data += delse:breakdata = data.decode("utf8")html_data = data.split("\r\n\r\n")[1]print(html_data)client.close()if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()tasks = [get_html("http://shop.projectsedu.com/goods/2/") for i in range(10)]loop.run_until_complete(asyncio.wait(tasks))print(time.time() - start_time)

asyncio 的 同步 和 通信

在多少線程中考慮安全性,需要加鎖,在協程中是不需要的

import asynciototal = 0 lock = Noneasync def add():global totalfor _ in range(1000):total += 1async def desc():global total, lockfor _ in range(1000):total -= 1if __name__ == '__main__':tasks = [add(), desc()]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))print(total)

在有些情況中,對協程還是需要類似鎖的機制

示例:parse_response 和 use_response 有共同調用的代碼,get_response、parse_response 去請求的時候 如果 get_response 也去請求,會觸發網站的反爬蟲機制.
這就需要我們像上訴代碼那樣加 lock,同時 get_response 和 use_response?中都調用了parse_response,我們想在 get_response 中只請求一次,下次用緩存,所以要用到鎖

import asyncio import aiohttp from asyncio import Lockcache = {} lock = Lock()async def get_response(url):async with lock: # 等價于 with await lock: 還有async for 。。。類似的用法# 這里使用async with 是因為 Lock中有__await__ 和 __aenter__兩個魔法方法# 和線程一樣, 這里也可以用 await lock.acquire() 并在結束時 lock.releaseif url in cache:return cache[url]print("第一次請求")response = aiohttp.request('GET', url)cache[url] = responsereturn responseasync def parse_response(url):response = await get_response(url)print('parse_response', response)# do some parseasync def use_response(url):response = await get_response(url)print('use_response', response)# use response to do something interestingif __name__ == '__main__':tasks = [parse_response('baidu'), use_response('baidu')]loop = asyncio.get_event_loop()# loop.run_until_complete將task放到loop中,進行事件循環, 這里必須傳入的是一個listloop.run_until_complete(asyncio.wait(tasks))

輸出結果如下 

asyncio 通信 queue

協程是單線程的,所以協程中完全可以使用全局變量實現 queue 來相互通信,但是如果想要在 queue 中定義存放有限的最大數目,需要在 put 和 get 的前面都要加 await?

from asyncio import Queuequeue = Queue(maxsize=3) await queue.get() await queue.put()

一個事件循環中執行多個 task,實現并發執行

future task:?

  • future 是一個結果的容器,結果執行完后在內部會回調 call_back 函數
  • task future 的子類,可以用來激活協程。(?task 協程 Future 橋梁

waitgather、await

1. waitgather 這2個函數都是用于獲取結果的,且都不阻塞,直接返回一個生成器對象可用于 yield from / await

2. 兩種用法可以獲取執行完成后的結果:
? ? ? ? 第一種: result = asyncio.run_until_completed(asyncio.wait/gather)? ? 執行完成所有之后獲取結果
? ? ? ? 第二種: result = await?asyncio.wait/gather? ? ?在一個協程內獲取結果

3. as_completed 與并發包 concurrent 中的行為類似,哪個任務先完成哪個先返回,內部實現是 yield from Queue.get()

4. 嵌套:await / yield from 后跟協程,直到后面的協程運行完畢,才執行 await / yield from?下面的代碼整個過程是不阻塞的?

wait 和 gather 區別?

這兩個都可以添加多個任務到事件循環中

一般使用 asyncio.wait(tasks) 的地方也可以使用 asyncio.gather(tasks) ,但是?wait?接收一堆 task,gather接收一個 task 列表。

asyncio.wait(tasks)方法返回值是兩組 task/future的 set。dones, pendings = await asyncio.wait(tasks)?其中

  • dones 是 task的 set,
  • pendings 是 future 的 set。

asyncio.gather(tasks) 返回一個結果的 list。

gather 比 wait? 更加的高級

  • 可以對任務進行分組
  • 可以取消任務
import asyncio import timeasync def get_html(url):global indexprint(f"{index} start get url")await asyncio.sleep(2)index += 1print(f"{index} end get url")if __name__ == "__main__":start_time = time.time()index = 1loop = asyncio.get_event_loop()tasks = [get_html("http://www.imooc.com") for i in range(10)]# gather和wait的區別# tasks = [get_html("http://www.imooc.com") for i in range(10)]# loop.run_until_complete(asyncio.wait(tasks))group1 = [get_html("http://projectsedu.com") for i in range(2)]group2 = [get_html("http://www.imooc.com") for i in range(2)]group1 = asyncio.gather(*group1)group2 = asyncio.gather(*group2)loop.run_until_complete(asyncio.gather(group1, group2))print(time.time() - start_time)

asf

示例 1:

import asyncio"""并發 執行多個任務。調度一個Task對象列表調用 asyncio.wait 或者 asyncio.gather 獲取結果 """async def func(i):print('start')# 交出控制權,事件循環執行下個任務,同時等待完成await asyncio.sleep(i)return iasync def func_sleep():await asyncio.sleep(2)def test_1():# asyncio create_task永遠運行# https://www.pythonheidong.com/blog/article/160584/ca5dc07f62899cedad64/lp = asyncio.get_event_loop()tasks = [lp.create_task(func(i)) for i in range(3)]lp.run_until_complete(func_sleep())# 或者# lp.run_until_complete(asyncio.wait([func_sleep(), ]))def test_2(): lp = asyncio.get_event_loop()# tasks = [func(i) for i in range(3)]# tasks = [asyncio.ensure_future(func(i)) for i in range(3)] # asyncio.ensure_future# 或者tasks = [lp.create_task(func(i)) for i in range(3)] # lp.create_tasklp.run_until_complete(asyncio.wait(tasks))for task in tasks:print(task.result())if __name__ == '__main__':# test_1()test_2()pass

示例 2:

import asyncio"""通過 await 或者 yield from 形成1個鏈, 后面跟其他協程. 形成一個鏈的目的很簡單,當前協程需要這個結果才能繼續執行下去.就跟普通函數調用其他函數獲取結果一樣 """async def func(i):print('start')await asyncio.sleep(i)return iasync def to_do():print('to_do start')tasks = []# 開始調度3個協程對象for i in range(3):tasks.append(asyncio.ensure_future(func(i)))# 在協程內等待結果. 通過 await 來交出控制權, 同時等待tasks完成task_done, task_pending = await asyncio.wait(tasks)print('to_do get result')# 獲取已經完成的任務for task in task_done:print('task_done:', task.result())# 未完成的for task in task_pending:print('pending:', task)if __name__ == '__main__':lp = asyncio.get_event_loop() # 獲取事件循環lp.run_until_complete(to_do()) # 把協程對象放進去# lp.close() # 關閉事件循環

as_completed 函數返回一個迭代器,每次迭代一個協程。

事件循環內部有一個 Queue(queue.Queue 線程安全) ,?先完成的先入隊。

as_completed 迭代的協程源碼是 :? 注意 yield from 后面可以跟 iterable

#簡化版代碼 f = yield from done.get() # done 是 Queue return f.result()

例子:

asyncio.as_completed 返回一個生成器對象 , 因此可用于迭代

每次從此生成器中返回的對象是一個個協程(生成器),哪個最先完成哪個就返回,?而要從 生成器/協程 中獲取返回值,就必須使用 yield from / await , 簡單來說就是:生成器的返回值在異常中,?詳情參考最上面的基礎鏈接

import asyncioasync def func(x):# print('\t\tstart ',x)await asyncio.sleep(5)# print('\t\tdone ', x)return xasync def to_do():# 在協程內調度2個協程tasks = [asyncio.ensure_future(func(i)) for i in range(2)]# 使用as_completed:先完成,先返回.# 每次迭代返回一個協程.# 這個協程:_wait_for_one,內部從隊列中產出一個最先完成的Future對象for coroutine in asyncio.as_completed(tasks):result = await coroutine # 等待協程,并返回先完成的協程print('result :', result)print('all done')lp = asyncio.get_event_loop() lp.set_debug(True) lp.run_until_complete(to_do()) # 調度協程

獲取多個并發的 task 的結果。

(?task 協程Future 橋梁

  • 如果我們要獲取 task 的結果,一定要創建一個task,就是把我們的協程綁定要 task 上,這里直接用事件循環 loop 里面的 create_task 就可以搞定。

  • 我們假設有3個并發的add任務需要處理,然后調用 run_until_complete 來等待3個并發任務完成。

  • 調用 task.result 查看結果,這里的 task 其實是 _asyncio.Task,是封裝的一個類。大家可以在 Pycharm 中找 asyncio 里面的源碼,里面有一個 tasks 文件。

爬取有道詞典

玩并發比較多的是爬蟲,爬蟲可以用多線程,線程池去爬。但是我們用 requests 的時候是阻塞的,無法并發。所以我們要用一個更牛逼的庫 aiohttp,這個庫可以當成是異步的 requests。

1). 爬取有道詞典

有道翻譯的API已經做好了,我們可以直接調用爬取。然后解析網頁,獲取單詞的翻譯。然后解析網頁,網頁比較簡單,可以有很多方法解析。因為爬蟲文章已經泛濫了,我這里就不展開了,很容易就可以獲取單詞的解釋。

2). 代碼的核心框架

  • 設計一個異步的框架,生成一個事件循環

  • 創建一個專門去爬取網頁的協程,利用aiohttp去爬取網站內容

  • 生成多個要翻譯的單詞的url地址,組建一個異步的tasks, 扔到事件循環里面

  • 等待所有的頁面爬取完畢,然后用pyquery去一一解析網頁,獲取單詞的解釋,部分代碼如下:

import time import asyncio import aiohttp from pyquery import PyQuery as pqdef decode_html(html_content=None):url, resp_text = html_contentdoc = pq(resp_text)des = ''for li in doc.items('#phrsListTab .trans-container ul li'):des += li.text()return url, desasync def fetch(session: aiohttp.ClientSession = None, url=None):async with session.get(url=url) as resp:resp_text = await resp.text()return url, resp_textasync def main(word_list=None):url_list = ['http://dict.youdao.com/w/{}'.format(word) for word in word_list]temp_task_list = []async with aiohttp.ClientSession() as session:for url in url_list:temp_task_list.append(fetch(session, url))html_list = await asyncio.gather(*temp_task_list)for html_content in html_list:print(decode_html(html_content))if __name__ == '__main__':start_time = time.time()text = 'apple'word_list_1 = [ch for ch in text]word_list_2 = [text for _ in range(100)]loop = asyncio.get_event_loop()task_list = [main(word_list_1),main(word_list_2),]loop.run_until_complete(asyncio.wait(task_list))print(time.time() - start_time)

談到 http 接口調用,Requests 大家并不陌生,例如,robotframework-requests、HttpRunner 等 HTTP 接口測試庫/框架都是基于它開發。

這里將介紹另一款http接口測試框架 httpx,snaic 同樣也集成了 httpx 庫。httpx 的 API 和 Requests 高度一致。github:?https://github.com/encode/httpx

安裝:pip install httpx

httpx 簡單使用

import json import httpxr = httpx.get("http://httpbin.org/get") print(r.status_code) print(json.dumps(r.json(), ensure_ascii=False, indent=4))

帶參數的 post 調用

import json import httpxpayload = {'key1': 'value1', 'key2': 'value2'} r = httpx.post("http://httpbin.org/post", data=payload) print(r.status_code) print(json.dumps(r.json(), ensure_ascii=False, indent=4))

httpx 異步調用。接下來認識 httpx 的異步調用:

import json import httpx import asyncioasync def main():async with httpx.AsyncClient() as client:resp = await client.get('http://httpbin.org/get')result = resp.json()print(json.dumps(result, ensure_ascii=False, indent=4))asyncio.run(main())

httpx 異步調用

import httpx import asyncio import timeasync def request(client):global indexresp = await client.get('http://httpbin.org/get')index += 1result = resp.json()print(f'{index} status_code : {resp.status_code}')assert 200 == resp.status_codeasync def main():async with httpx.AsyncClient() as client:# 50 次調用task_list = []for _ in range(50):req = request(client)task = asyncio.create_task(req)task_list.append(task)await asyncio.gather(*task_list)if __name__ == "__main__":index = 0# 開始start = time.time()asyncio.run(main())# 結束end = time.time()print(f'異步:發送 50 次請求,耗時:{end - start}')

想搞懂 異步框架 和異步接口的調用,可以按這個路線學習:1.python異步編程、2.python Web異步框架(tornado/sanic)、3.異步接口調用(aiohttp/httpx)

1. asyncio

示例 1:( Python 3.5+ 之前的寫法??)

import asyncio@asyncio.coroutine def func1():print('before...func1......')yield from asyncio.sleep(5)print('end...func1......')tasks = [func1(), func1()]loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) loop.close()

改進,使用 async / await 關鍵字 (?Python 3.5+ 開始引入了新的語法?async?和?await )

import asyncioasync def func1():print('before...func1......')await asyncio.sleep(5)print('end...func1......')tasks = [func1(), func1()]loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) loop.close()

示例 2 :

import asyncioasync def fetch_async(host, url='/'):print(host, url)reader, writer = await asyncio.open_connection(host, 80)request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,)request_header_content = bytes(request_header_content, encoding='utf-8')writer.write(request_header_content)await writer.drain()text = await reader.read()print(host, url, text)writer.close()tasks = [fetch_async('www.cnblogs.com', '/wupeiqi/'),fetch_async('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091') ]loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()

示例 3:

#!/usr/bin/env python # encoding:utf-8 import asyncio import aiohttp import timeasync def download(url): # 通過async def定義的函數是原生的協程對象print("get: %s" % url)async with aiohttp.ClientSession() as session:async with session.get(url) as resp:print(resp.status)# response = await resp.read()async def main():start = time.time()await asyncio.wait([download("http://www.163.com"),download("http://www.mi.com"),download("http://www.baidu.com")])end = time.time()print("Complete in {} seconds".format(end - start))loop = asyncio.get_event_loop() loop.run_until_complete(main())

Python 異步編程之 asyncio(百萬并發)

前言:python 由于 GIL(全局鎖)的存在,不能發揮多核的優勢,其性能一直飽受詬病。然而在 IO 密集型的網絡編程里,異步處理比同步處理能提升成百上千倍的效率,彌補了 python 性能方面的短板,如最新的微服務框架 japronto,resquests per second 可達百萬級。

python 還有一個優勢是庫(第三方庫)極為豐富,運用十分方便。asyncio 是 python3.4 版本引入到標準庫,python2x 沒有加這個庫,畢竟 python3x 才是未來啊,哈哈!python3.5 又加入了 async/await 特性。在學習 asyncio 之前,我們先來理清楚?同步/異步的概念

  • 同步?是指完成事務的邏輯,先執行第一個事務,如果阻塞了,會一直等待,直到這個事務完成,再執行第二個事務,順序執行。。。
  • 異步?是和同步相對的,異步是指在處理調用這個事務的之后,不會等待這個事務的處理結果,直接處理第二個事務去了,通過狀態、通知、回調來通知調用者處理結果。

aiohttp 使用

  如果需要并發 http 請求怎么辦呢,通常是用 requests,但 requests 是同步的庫,如果想異步的話需要引入 aiohttp。這里引入一個類,from aiohttp import ClientSession,首先要建立一個 session 對象,然后用 session 對象去打開網頁。session 可以進行多項操作,比如 post、get、put、head 等。

基本用法:

async with ClientSession() as session:async with session.get(url) as response:

aiohttp 異步實現的例子:

import asyncio from aiohttp import ClientSessiontasks = [] url = "http://httpbin.org/get?args=hello_word"async def hello(t_url):async with ClientSession() as session:async with session.get(t_url) as req:response = await req.read()# response = await req.text()# print(response)print(f'{req.url} : {req.status}')if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(hello(url))

首先 async def 關鍵字定義了這是個異步函數,await 關鍵字加在需要等待的操作前面,req.read() 等待 request 響應,是個耗 IO 操作。然后使用 ClientSession 類發起 http 請求。

異步請求多個URL

如果我們需要請求多個 URL 該怎么辦呢?

  • 同步的做法:訪問多個 URL時,只需要加個 for 循環就可以了。
  • 但異步的實現方式并沒那么容易:在之前的基礎上需要將 hello()?包裝在 asyncio 的 Future 對象中,然后將 Future對象列表 作為 任務 傳遞給 事件循環
import datetime import asyncio from aiohttp import ClientSessiontask_list = [] url = "https://www.baidu.com/{}"async def hello(t_url):ret_val = Noneasync with ClientSession() as session:async with session.get(t_url) as req:response = await req.read()print(f'Hello World:{datetime.datetime.now().replace(microsecond=0)}')# print(response)ret_val = req.statusreturn ret_valdef run():for i in range(5):one_task = asyncio.ensure_future(hello(url.format(i)))task_list.append(one_task)if __name__ == '__main__':loop = asyncio.get_event_loop()run()result = loop.run_until_complete(asyncio.wait(task_list))# 方法 1 : 獲取結果for task in task_list:print(task.result())# 方法 2 : 獲取結果finish_task, pending_task = resultprint(f'finish_task count:{len(pending_task)}')for task in finish_task:print(task.result())print(f'pending_task count:{len(pending_task)}')for task in pending_task:print(task.result())''' Hello World:2020-12-06 16:29:02 Hello World:2020-12-06 16:29:02 Hello World:2020-12-06 16:29:02 Hello World:2020-12-06 16:29:02 Hello World:2020-12-06 16:29:02 404 404 404 404 404 finish_task count:0 404 404 404 404 404 pending_task count:0 '''

收集 http 響應

上面介紹了訪問不同 URL 的異步實現方式,但是我們只是發出了請求,如果要把響應一一收集到一個列表中,最后保存到本地或者打印出來要怎么實現呢??

可通過 asyncio.gather(*tasks) 將響應全部收集起來,具體通過下面實例來演示。

import time import asyncio from aiohttp import ClientSessiontask_list = [] temp_url = "https://www.baidu.com/{}"async def hello(url=None):async with ClientSession() as session:async with session.get(url) as request:# print(request)print('Hello World:%s' % time.time())return await request.read()def run():for i in range(5):task = asyncio.ensure_future(hello(temp_url.format(i)))task_list.append(task)result = loop.run_until_complete(asyncio.gather(*task_list))print(f'len(result) : {len(result)}')for item in result:print(item)if __name__ == '__main__':loop = asyncio.get_event_loop()run()

限制并發數(最大文件描述符的限制)

提示:此方法也可用來作為異步爬蟲的限速方法(反反爬)

假如你的并發達到 2000 個,程序會報錯:ValueError: too many file descriptors in select()。報錯的原因字面上看是 Python 調取的 select 對打開的文件有最大數量的限制,這個其實是操作系統的限制,linux 打開文件的最大數默認是 1024,windows 默認是 509,超過了這個值,程序就開始報錯。這里我們有三種方法解決這個問題:

  • 1. 限制并發數量。(一次不要塞那么多任務,或者限制最大并發數量)
  • 2. 使用回調的方式
  • 3. 修改操作系統打開文件數的最大限制,在系統里有個配置文件可以修改默認值,具體步驟不再說明了。

不修改系統默認配置的話,個人推薦限制并發數的方法,設置并發數為 500,處理速度更快。

使用 semaphore = asyncio.Semaphore(500) 以及在協程中使用 async with semaphore:? 操作具體代碼如下:

# coding:utf-8 import time, asyncio, aiohttpurl = 'https://www.baidu.com/'index = 0async def hello(url, semaphore):global indexasync with semaphore:async with aiohttp.ClientSession() as session:async with session.get(url) as response:print(f'{index} : ', end='')await asyncio.sleep(2)print(response.status)return await response.read()async def run():# 為了看效果,這是設置 100 個任務,并發限制為 5semaphore = asyncio.Semaphore(5) # 限制并發量為500to_get = [hello(url.format(), semaphore) for _ in range(100)] # 總共1000任務await asyncio.wait(to_get)if __name__ == '__main__':# now=lambda :time.time()loop = asyncio.get_event_loop()loop.run_until_complete(run())# loop.close()

示例代碼:

import asyncio import aiohttpasync def get_http(url):async with semaphore:async with aiohttp.ClientSession() as session:async with session.get(url) as res:global countcount += 1print(count, res.status)if __name__ == '__main__':count = 0semaphore = asyncio.Semaphore(500)loop = asyncio.get_event_loop()temp_url = 'https://www.baidu.com/s?wd={0}'tasks = [get_http(temp_url.format(i)) for i in range(600)]loop.run_until_complete(asyncio.wait(tasks))loop.close()

示例代碼:

from aiohttp import ClientSession import asyncio# 限制協程并發量 async def hello(sem, num):async with sem:async with ClientSession() as session:async with session.get(f'http://httpbin.org/get?a={num}') as response:r = await response.read()print(f'[{num}]:{r}')await asyncio.sleep(1)def main():loop = asyncio.get_event_loop()tasks = []sem = asyncio.Semaphore(5) # thisfor index in range(100000):task = asyncio.ensure_future(hello(sem, index))tasks.append(task)feature = asyncio.ensure_future(asyncio.gather(*tasks))try:loop.run_until_complete(feature)finally:loop.close()if __name__ == "__main__":main()

aiohttp 實現高并發爬蟲 ( 異步 mysql?)

python asyncio并發編程:https://www.cnblogs.com/crazymagic/articles/10066619.html

# asyncio爬蟲, 去重, 入庫import asyncio import re import aiohttp import aiomysql from pyquery import PyQuerystopping = Falsestart_url = 'http://www.jobbole.com' waiting_urls = [] seen_urls = set() # 實際使用爬蟲去重時,數量過多,需要使用布隆過濾器async def fetch(url, session):async with aiohttp.ClientSession() as session:try:async with session.get(url) as resp:print('url status: {}'.format(resp.status))if resp.status in [200, 201]:data = await resp.text()return dataexcept Exception as e:print(e)def extract_urls(html): # html中提取所有urlurls = []pq = PyQuery(html)for link in pq.items('a'):url = link.attr('href')if url and url.startwith('http') and url not in seen_urls:urls.append(url)waiting_urls.append(urls)return urlsasync def init_urls(url, session):html = await fetch(url, session)seen_urls.add(url)extract_urls(html)async def article_handler(url, session, pool): # 獲取文章詳情并解析入庫html = await fetch(url, session)extract_urls(html)pq = PyQuery(html)title = pq('title').text() # 為了簡單, 只獲取title的內容async with pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute('SELECT 42;')insert_sql = "insert into article_test(title) values('{}')".format(title)await cur.execute(insert_sql) # 插入數據庫# print(cur.description)# (r,) = await cur.fetchone()# assert r == 42async def consumer(pool):async with aiohttp.ClientSession() as session:while not stopping:if len(waiting_urls) == 0: # 如果使用asyncio.Queue的話, 不需要我們來處理這些邏輯。await asyncio.sleep(0.5)continueurl = waiting_urls.pop()print('start get url:{}'.format(url))if re.match(r'http://.*?jobbole.com/\d+/', url):if url not in seen_urls: # 是沒有處理過的url,則處理asyncio.ensure_future(article_handler(url, sssion, pool))else:if url not in seen_urls:asyncio.ensure_future(init_urls(url))async def main(loop):# 等待mysql連接建立好pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='',db='aiomysql_test', loop=loop, charset='utf8', autocommit=True)# charset autocommit必須設置, 這是坑, 不寫數據庫寫入不了中文數據async with aiohttp.ClientSession() as session:html = await fetch(start_url, session)seen_urls.add(start_url)extract_urls(html)asyncio.ensure_future(consumer(pool))if __name__ == '__main__':event_loop = asyncio.get_event_loop()asyncio.ensure_future(main(event_loop))event_loop.run_forever()

學習 python 高并發模塊 asynio

參考:Python黑魔法 --- 異步IO( asyncio) 協程:https://www.jianshu.com/p/b5e347b3a17c

Python 中重要的模塊 --- asyncio:https://www.cnblogs.com/zhaof/p/8490045.html
Python 協程深入理解:https://www.cnblogs.com/zhaof/p/7631851.html

asyncio 是 python 用于解決異步io編程的一整套解決方案

創建一個 asyncio 的步驟如下

  • 創建一個 event_loop 事件循環,當啟動時,程序開啟一個無限循環,把一些函數注冊到事件循環上,當滿足事件發生的時候,調用相應的協程函數。
  • 創建協程: 使用 async 關鍵字定義的函數就是一個協程對象。在協程函數內部可以使用 await 關鍵字用于阻塞操作的掛起。
  • 將協程注冊到事件循環中。協程的調用不會立即執行函數,而是會返回一個協程對象。協程對象需要注冊到事件循環,由事件循環調用。
  • 一、定義一個協程

    import time import asyncioasync def do_some_work(x):print("waiting:", x)start = time.time() # 這里是一個協程對象,這個時候do_some_work函數并沒有執行 coroutine = do_some_work(2) print(coroutine) # 創建一個事件loop loop = asyncio.get_event_loop() # 將協程注冊到事件循環,并啟動事件循環 loop.run_until_complete(coroutine)print("Time:", time.time() - start)

    二、創建一個 task

    一個協程對象 就是 一個原生可以掛起的函數任務則是對協程進一步封裝,其中包含了任務的各種狀態。

    在上面的代碼中,在注冊事件循環的時候,其實是 run_until_complete 方法將協程包裝成為了一個任務(task)對象。 task 對象是 Future類的子類,保存了協程運行后的狀態,用于未來獲取協程的結果

    import asyncio import timestart = lambda: time.time()async def do_some_work(x):print("waiting:", x)start = start()coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) print(task) loop.run_until_complete(task) print(task) print("Time:", time.time() - start)

    loop.create_taskasyncio.async / asyncio.ensure_futureTask 有什么區別?

    BaseEventLoop.create_task(coro) 、asyncio.async(coro)、Task(coro)安排協同程序執行,這似乎也可以正常工作。那么,所有這些之間有什么區別?

    • 在 Python> = 3.5中,已將 async 設為關鍵字,所以 asyncio.async 必須替換為 asyncio.ensure_future
    • create_task 的存在理由:
      ? ? ? ? 第三方事件循環可以使用其自己的Task子類來實現互操作性。在這種情況下,結果類型是Task的子類。
      ? ? ? ? 這也意味著您不應直接創建 Task ,因為不同的事件循環可能具有不同的創建"任務"的方式。
    • 另一個重要區別是,除了接受協程外, ensure_future 也接受任何等待的對象;而 create_task 只接受協程。

    那么用 ensure_future 還是 create_task 函數聲明對比:

    • asyncio.ensure_future(coro_or_future, *, loop=None)
    • BaseEventLoop.create_task(coro)

    顯然,ensure_future 除了接受 coroutine 作為參數,還接受 future 作為參數。

    看 ensure_future 的代碼,會發現 ensure_future 內部在某些條件下會調用 create_task,綜上所述:

    • encure_future: 最高層的函數,推薦使用!
    • create_task: 在確定參數是 coroutine 的情況下可以使用。
    • Task: 可能很多時候也可以工作,但真的沒有使用的理由!

    為了 interoperability,第三方的事件循環可以使用自己的 Task 子類。這種情況下,返回結果的類型是 Task 的子類。

    所以,不要直接創建 Task 實例,應該使用 ensure_future() 函數或 BaseEventLoop.create_task() 方法。

    asyncio.ensure_futureBaseEventLoop.create_task 對比簡單的協同程序

    From:https://www.pythonheidong.com/blog/article/55686/127e90ac54b5ed388d52/

    看過幾個關于asyncio的基本Python 3.5教程,它們以各種方式執行相同的操作。在這段代碼中:

    import asyncioasync def doit(i):print("Start %d" % i)await asyncio.sleep(3)print("End %d" % i)return iif __name__ == '__main__':loop = asyncio.get_event_loop()# futures = [asyncio.ensure_future(doit(i), loop=loop) for i in range(10)]# futures = [loop.create_task(doit(i)) for i in range(10)]futures = [doit(i) for i in range(10)]result = loop.run_until_complete(asyncio.gather(*futures))print(result)

    上面定義?futures?變量的所有三個變體都實現了相同的結果,那么他們有什么區別?有些情況下我不能只使用最簡單的變體(協程的簡單列表)嗎?

    asyncio.create_task(coro) asyncio.ensure_future(obj)

    從 Python 3.7 開始,為此目的添加了?asyncio.create_task(coro)?高級功能,可以使用它來代替從 coroutimes 創建任務的其他方法。

    但是,如果需要從任意等待創建任務,應該使用?asyncio.ensure_future(obj)。

    推薦:使用?asyncio.ensure_future(obj) 來代替 asyncio.create_task(coro)

    ensure_future VS create_task

    ensure_future是創建一個方法Task從coroutine。它基于參數以不同的方式創建任務(包括使用create_task協同程序和類似未來的對象)。

    create_task是一種抽象的方法AbstractEventLoop。不同的事件循環可以不同的方式實現此功能。

    您應該使用ensure_future創建任務。create_task只有在你要實現自己的事件循環類型時才需要。

    “當從協程創建任務時,你應該使用適當命名的loop.create_task()”

    在任務中包裝協程 - 是一種在后臺啟動此協程的方法。這是一個例子:

    import asyncioasync def msg(text):await asyncio.sleep(0.1)print(text)async def long_operation():print('long_operation started')await asyncio.sleep(3)print('long_operation finished')async def main():await msg('first')# Now you want to start long_operation, but you don't want to wait it finised:# long_operation should be started, but second msg should be printed immediately.# Create task to do so:task = asyncio.ensure_future(long_operation())await msg('second')# Now, when you want, you can await task finised:await taskif __name__ == "__main__":loop = asyncio.get_event_loop()loop.run_until_complete(main())''' 輸出: first long_operation started second long_operation finished '''

    創建任務:

    • 可以通過 loop.create_task(coroutine) 創建 task,
    • 也可以通過 asyncio.ensure_future(coroutine) 創建 task。

    使用這兩種方式的區別在?官網(?https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.ensure_future )上有提及。

    task / future 以及使用 async 創建的都是 awaitable 對象,都可以在 await 關鍵字之后使用。

    future 對象意味著在未來返回結果,可以搭配回調函數使用

    要真正運行一個協程,asyncio 提供了三種主要機制

    (?https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.ensure_future?)

    • 1.??asyncio.run()?函數用來運行最高層級的入口點 "main()" 函數。
    import asyncioasync def main():print('hello')await asyncio.sleep(1)print('world')asyncio.run(main())
    • 2. 使用 await ( 即 等待一個協程?)
    import asyncio import timeasync def say_after(delay, what):await asyncio.sleep(delay)print(what)async def main():print(f"started at {time.strftime('%X')}")await say_after(3, 'hello')await say_after(1, 'world')print(f"finished at {time.strftime('%X')}")asyncio.run(main())

    預期的輸出:

    started at 17:13:52 world hello finished at 17:13:55
    • 3. 使用?asyncio.create_task()?函數用來并發運行作為 asyncio?任務?的多個協程。
    import asyncio import timeasync def say_after(delay, what):await asyncio.sleep(delay)print(what)async def main():task1 = asyncio.create_task(say_after(3, 'hello'))task2 = asyncio.create_task(say_after(1, 'world'))print(f"started at {time.strftime('%X')}")# Wait until both tasks are completed (should take# around 2 seconds.)await task1await task2asyncio.run(main())

    預期的輸出:

    started at 17:14:32 world hello finished at 17:14:34

    獲取協程的返回值

    • 1 創建一個任務 task
    • 2 通過調用 task.result 獲取協程的返回值
    import asyncio import timeasync def get_html(url):print("start get url")await asyncio.sleep(2)return "this is test"if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()task = loop.create_task(get_html("http://httpbin.org"))loop.run_until_complete(task)print(task.result())

    三、綁定回調

    執行成功進行回調處理

    可以通過? add_done_callback( 任務) 添加回調,因為這個函數只接受一個回調的函數名,不能傳參,我們想要傳參可以使用偏函數

    # 獲取協程的返回值 import asyncio import time from functools import partialasync def get_html(url):print("start get url")await asyncio.sleep(2)return "this is test"def callback(url, future):print(url)print("send email to bobby")if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()task = loop.create_task(get_html("http://www.imooc.com"))task.add_done_callback(partial(callback, "http://www.imooc.com"))loop.run_until_complete(task)print(task.result())

    asnycio 異步請求+異步回調https://blog.csdn.net/u013917468/article/details/104609908

    當使用 ensure_feature 創建任務的時候,可以使用任務的 task.add_done_callback(callback)方法,獲得對象的協程返回值。

    import asyncio import timeasync def do_some_work(x):print("waiting:", x)return "Done after {}s".format(x)def callback(future):print("callback:", future.result())start = time.time() coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) print(task) task.add_done_callback(callback) print(task) loop.run_until_complete(task)

    四、阻塞 使用?await 讓出控制權,掛起當前操作 )

    前面提到 asynic 函數內部可以使用 await 來針對耗時的操作進行掛起。

    import asyncio import timeasync def do_some_work(x):print("waiting:", x)# await 后面就是調用耗時的操作await asyncio.sleep(x)return "Done after {}s".format(x)coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task)

    五、并發并行

    • 并發:同一時刻 同時 發生
    • 并行:同一時間 間隔 發生

    并發通常是指有多個任務需要同時進行,并行則是同一個時刻有多個任務執行.
    當有多個任務需要并行時,可以將任務先放置在任務隊列中,然后將任務隊列傳給 asynicio.wait 方法,這個方法會同時并行運行隊列中的任務。將其注冊到事件循環中。

    import asyncioasync def do_some_work(x):print("Waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3) ]loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))

    六、嵌套協程

    使用 async 可以定義協程,協程用于耗時的 io 操作,我們也可以封裝更多的 io 操作過程,這樣就實現了嵌套的協程,即一個協程中 await 了另外一個協程,如此連接起來。

    import asyncioasync def do_some_work(x):print("waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)async def main():coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]dones, pendings = await asyncio.wait(tasks)for task in dones:print("Task ret:", task.result())# results = await asyncio.gather(*tasks)# for result in results:# print("Task ret:",result)loop = asyncio.get_event_loop() loop.run_until_complete(main())

    使用 asyncio.wait 的結果如下,可見返回的結果 dones 并不一定按照順序輸出

    waiting: 1 waiting: 2 waiting: 4 Task ret: Done after 2s Task ret: Done after 4s Task ret: Done after 1s Time: 4.006587505340576

    使用 await asyncio.gather(*tasks) 得到的結果如下,是按照列表順序進行返回的

    waiting: 1 waiting: 2 waiting: 4 Task ret: Done after 1s Task ret: Done after 2s Task ret: Done after 4s Time: 4.004234313964844

    上面的程序將 main 也定義為協程。我們也可以不在 main 協程函數里處理結果,直接返回 await 的內容,那么最外層的 run_until_complete 將會返回main協程的結果。

    import asyncio import timenow = lambda: time.time()async def do_some_work(x):print("waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)async def main():coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]return await asyncio.gather(*tasks)# return await asyncio.wait(tasks)也可以使用。注意gather方法需要*這個標記start = now()loop = asyncio.get_event_loop() results = loop.run_until_complete(main()) for result in results:print("Task ret:", result)print("Time:", now() - start)

    也可以使用 as_complete 方法實現嵌套協程

    import asyncio import timenow = lambda: time.time()async def do_some_work(x):print("waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)async def main():coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]for task in asyncio.as_completed(tasks):result = await taskprint("Task ret: {}".format(result))start = now()loop = asyncio.get_event_loop() loop.run_until_complete(main()) print("Time:", now() - start)

    七、協程停止

    創建 future 的時候,task 為 pending,事件循環調用執行的時候當然就是 running,調用完畢自然就是 done,如果需要停止事件循環,就需要先把 task 取消。可以使用 asyncio.Task 獲取事件循環的 task。
    future 對象有如下幾個狀態:Pending、Running、Done、Cacelled

    import asyncio import timenow = lambda: time.time()async def do_some_work(x):print("Waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(2)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3), ]start = now()loop = asyncio.get_event_loop() try:loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e:print(asyncio.Task.all_tasks())for task in asyncio.Task.all_tasks():print(task.cancel())loop.stop()loop.run_forever() finally:loop.close()print("Time:", now() - start)

    啟動事件循環之后,馬上 ctrl+c,會觸發 run_until_complete 的執行異常 KeyBorardInterrupt。然后通過循環 asyncio.Task 取消 future。可以看到輸出如下:

    Waiting: 1 Waiting: 2 Waiting: 2 ^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result='Done after 1s'>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<wait() running at /usr/local/lib/python3.5/asyncio/tasks.py:361> wait_for=<Future pending cb=[Task._wakeup()]>>} False True True True Time: 1.0707225799560547

    True 表示 cannel 成功,loop stop 之后還需要再次開啟事件循環,最后在 close,不然還會拋出異常.

    循環 task,逐個 cancel 是一種方案,可是正如上面我們把 task 的列表封裝在 main 函數中,main 函數外進行事件循環的調用。這個時候,main 相當于最外出的一個task,那么處理包裝的main 函數即可。

    task取消和子協程調用原理

    程序運行時 通過 ctl +c 取消任務 調用task.cancel()取消任務

    import asyncio import timeasync def get_html(sleep_times):print("waiting")await asyncio.sleep(sleep_times)print("done after {}s".format(sleep_times))if __name__ == "__main__":task1 = get_html(2)task2 = get_html(3)task3 = get_html(3)tasks = [task1, task2, task3]loop = asyncio.get_event_loop()try:loop.run_until_complete(asyncio.wait(tasks))except KeyboardInterrupt as e:all_tasks = asyncio.Task.all_tasks()for task in all_tasks:print("cancel task")print(task.cancel())loop.stop()loop.run_forever()finally:loop.close()

    在終端執行:python ceshi.py? ,運行成功后 按 ctl +c 取消任務  


    ?

    不同線程的事件循環( 線程、線程池 )

    很多時候,我們的事件循環用于注冊協程,而有的協程需要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程創建一個事件循環,然后在新建一個線程,在新線程中啟動事件循環。當前線程不會被 block。

    import asyncio from threading import Thread import timenow = lambda: time.time()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()def more_work(x):print('More work {}'.format(x))time.sleep(x)print('Finished more work {}'.format(x))start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: {}'.format(time.time() - start))new_loop.call_soon_threadsafe(more_work, 6) new_loop.call_soon_threadsafe(more_work, 3)

    啟動上述代碼之后,當前線程不會被 block,新線程中會按照順序執行 call_soon_threadsafe 方法注冊的 more_work 方法, 后者因為 time.sleep 操作是同步阻塞的,因此運行完畢more_work 需要大致 6 + 3

    使用 線程池

    # -*- coding:utf-8 -*- import asyncio import time from concurrent.futures import ThreadPoolExecutorthread_pool = ThreadPoolExecutor(5) tasks = []func_now = lambda: time.time()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()def more_work(x):print('More work {}'.format(x))time.sleep(x)print('Finished more work {}'.format(x))start = func_now() new_loop = asyncio.new_event_loop()thread_pool.submit(start_loop, new_loop)print('TIME: {}'.format(time.time() - start))new_loop.call_soon_threadsafe(more_work, 6) new_loop.call_soon_threadsafe(more_work, 3)

    主線程創建事件循環,子線程開啟無限事件循環

    import asyncio import time from threading import Threadnow = lambda: time.time()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def do_some_work(x):print('Waiting {}'.format(x))await asyncio.sleep(x)print('Done after {}s'.format(x))def more_work(x):print('More work {}'.format(x))time.sleep(x)print('Finished more work {}'.format(x))start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: {}'.format(time.time() - start))asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop) asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

    上述的例子,主線程中創建一個 new_loop,然后在另外的子線程中開啟一個無限事件循環。 主線程通過run_coroutine_threadsafe新注冊協程對象。這樣就能在子線程中進行事件循環的并發操作,同時主線程又不會被 block。一共執行的時間大概在 6s 左右。

    master - worker 主從模式

    對于并發任務,通常是用生成消費模型,對隊列的處理可以使用類似 master-worker 的方式,master 主要用戶獲取隊列的 msg,worker 用戶處理消息。

    為了簡單起見,并且協程更適合單線程的方式,我們的主線程用來監聽隊列,子線程用于處理隊列。這里使用 redis 的隊列。主線程中有一個是無限循環,用戶消費隊列。

    import time import asyncio from threading import Thread import redisdef get_redis(): # 返回一個 redis 連接對象connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.Redis(connection_pool=connection_pool)def start_loop(loop): # 開啟事件循環asyncio.set_event_loop(loop)loop.run_forever()async def worker(task):print('Start worker')while True:# start = now()# task = rcon.rpop("queue") # 從 redis 中 取出的數據# if not task:# await asyncio.sleep(1)# continueprint('Wait ', int(task)) # 取出了相應的任務await asyncio.sleep(int(task))print('Done ', task, now() - start)now = lambda: time.time() rcon = get_redis() start = now() # 創建一個事件循環 new_loop = asyncio.new_event_loop() # 創建一個線程 在新的線程中開啟事件循環 t = Thread(target=start_loop, args=(new_loop,)) t.setDaemon(True) # 設置線程為守護模式 t.start() # 開啟線程try:while True:task = rcon.rpop("queue") # 不斷從隊列中獲取任務if not task:time.sleep(1)continue# 包裝為 task ins, 傳入子線程中的事件循環asyncio.run_coroutine_threadsafe(worker(task), new_loop) except Exception as e:print('error', e)new_loop.stop() # 出現異常 關閉時間循環 finally:pass

    給隊列添加一些數據:

    127.0.0.1:6379[3]> lpush queue 2 (integer) 1 127.0.0.1:6379[3]> lpush queue 5 (integer) 1 127.0.0.1:6379[3]> lpush queue 1 (integer) 1 127.0.0.1:6379[3]> lpush queue 1

    可以看見輸出:

    Waiting 2 Done 2 Waiting 5 Waiting 1 Done 1 Waiting 1 Done 1 Done 5

    我們發起了一個耗時5s的操作,然后又發起了連個1s的操作,可以看見子線程并發的執行了這幾個任務,其中5s awati的時候,相繼執行了1s的兩個任務。

    改進:

    import time import redis import asyncio from threading import Threadredis_queue_name = 'redis_list:test'def get_redis(): # 返回一個 redis 連接對象connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.StrictRedis(connection_pool=connection_pool)def add_data_to_redis_list(*args):redis_conn = get_redis()redis_conn.lpush(redis_queue_name, *args)def start_loop(loop=None): # 開啟事件循環asyncio.set_event_loop(loop)loop.run_forever()async def worker(task=None):print('Start worker')while True:# task = redis_conn.rpop("queue") # 從 redis 中 取出的數據# if not task:# await asyncio.sleep(1)# continueprint('Wait ', int(task)) # 取出了相應的任務# 這里只是簡單的睡眠傳入的秒數await asyncio.sleep(int(task))def main():redis_conn = get_redis()# 創建一個事件循環new_loop = asyncio.new_event_loop()# 創建一個線程 在新的線程中開啟事件循環t = Thread(target=start_loop, args=(new_loop,))t.setDaemon(True) # 設置線程為守護模式t.start() # 開啟線程try:while True:task = redis_conn.rpop(name=redis_queue_name) # 不斷從隊列中獲取任務if not task:time.sleep(1)continue# 包裝為 task , 傳入子線程中的事件循環asyncio.run_coroutine_threadsafe(worker(task), new_loop)except Exception as e:print('error', e)new_loop.stop() # 出現異常 關閉時間循環finally:new_loop.close()if __name__ == '__main__':# data_list = [1, 2, 3, 4, 5, 6, 7, 8, 9]# add_data_to_redis_list(*data_list)main()pass

    redis隊列模型( 生產者 --- 消費者?)

    參考:https://zhuanlan.zhihu.com/p/59621713

    下面代碼的主線程和雙向隊列的主線程有些不同,只是換了一種寫法而已,代碼如下

    生產者代碼:

    import redisconn_pool = redis.ConnectionPool(host='127.0.0.1') redis_conn = redis.Redis(connection_pool=conn_pool)redis_conn.lpush('coro_test', '1') redis_conn.lpush('coro_test', '2') redis_conn.lpush('coro_test', '3') redis_conn.lpush('coro_test', '4')

    消費者代碼:

    import asyncio from threading import Thread import redisdef get_redis():conn_pool = redis.ConnectionPool(host='127.0.0.1')return redis.Redis(connection_pool=conn_pool)def start_thread_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def thread_example(name):print('正在執行name:', name)await asyncio.sleep(2)return '返回結果:' + nameredis_conn = get_redis()new_loop = asyncio.new_event_loop() loop_thread = Thread(target=start_thread_loop, args=(new_loop,)) loop_thread.setDaemon(True) loop_thread.start()# 循環接收redis消息并動態加入協程 while True:msg = redis_conn.rpop('coro_test')if msg:asyncio.run_coroutine_threadsafe(thread_example('Zarten' + bytes.decode(msg, 'utf-8')), new_loop)

    改進:

    import asyncio from threading import Thread import redisdef get_redis():conn_pool = redis.ConnectionPool(host='127.0.0.1')return redis.Redis(connection_pool=conn_pool)def start_thread_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def thread_example(name):print('正在執行name:', name)await asyncio.sleep(2)return '返回結果:' + nameif __name__ == '__main__':redis_queue_name = 'redis_list:test'redis_conn = get_redis()for num in range(1, 10):redis_conn.lpush(redis_queue_name, num)new_loop = asyncio.new_event_loop()loop_thread = Thread(target=start_thread_loop, args=(new_loop,))loop_thread.setDaemon(True)loop_thread.start()# 循環接收redis消息并動態加入協程while True:msg = redis_conn.rpop(name=redis_queue_name)if msg:asyncio.run_coroutine_threadsafe(thread_example('King_' + msg.decode('utf-8')), new_loop)pass

    停止子線程

    如果一切正常,那么上面的例子很完美。可是,需要停止程序,直接ctrl+c,會拋出KeyboardInterrupt錯誤,我們修改一下主循環:

    try:while True:task = rcon.rpop("queue")if not task:time.sleep(1)continueasyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop) except KeyboardInterrupt as e:print(e)new_loop.stop()

    可是實際上并不好使,雖然主線程 try 了 KeyboardInterrupt異常,但是子線程并沒有退出,為了解決這個問題,可以設置子線程為守護線程,這樣當主線程結束的時候,子線程也隨機退出。

    new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.setDaemon(True) # 設置子線程為守護線程 t.start()try:while True:# print('start rpop')task = rcon.rpop("queue")if not task:time.sleep(1)continueasyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop) except KeyboardInterrupt as e:print(e)new_loop.stop()

    線程停止程序的時候,主線程退出后,子線程也隨機退出才了,并且停止了子線程的協程任務。

    aiohttp

    在消費隊列的時候,我們使用 asyncio 的 sleep 用于模擬耗時的 io 操作。以前有一個短信服務,需要在協程中請求遠程的短信 api,此時需要是需要使用 aiohttp 進行異步的 http 請求。大致代碼如下:

    server.py

    import time from flask import Flask, requestapp = Flask(__name__)@app.route('/<int:x>') def index(x):time.sleep(x)return "{} It works".format(x)@app.route('/error') def error():time.sleep(3)return "error!"if __name__ == '__main__':app.run(debug=True)

    /?接口表示短信接口,/error?表示請求/失敗之后的報警。

    async-custoimer.py

    import time import asyncio from threading import Thread import redis import aiohttpdef get_redis():connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.Redis(connection_pool=connection_pool)rcon = get_redis()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def fetch(url):async with aiohttp.ClientSession() as session:async with session.get(url) as resp:print(resp.status)return await resp.text()async def do_some_work(x):print('Waiting ', x)try:ret = await fetch(url='http://127.0.0.1:5000/{}'.format(x))print(ret)except Exception as e:try:print(await fetch(url='http://127.0.0.1:5000/error'))except Exception as e:print(e)else:print('Done {}'.format(x))new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.setDaemon(True) t.start()try:while True:task = rcon.rpop("queue")if not task:time.sleep(1)continueasyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop) except Exception as e:print('error')new_loop.stop() finally:pass

    有一個問題需要注意,我們在fetch的時候try了異常,如果沒有try這個異常,即使發生了異常,子線程的事件循環也不會退出。主線程也不會退出,暫時沒找到辦法可以把子線程的異常raise傳播到主線程。(如果誰找到了比較好的方式,希望可以帶帶我)。

    對于 redis 的消費,還有一個 block 的方法:

    try:while True:_, task = rcon.brpop("queue")asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop) except Exception as e:print('error', e)new_loop.stop() finally:pass

    使用 brpop方法,會 block 住 task,如果主線程有消息,才會消費。測試了一下,似乎 brpop 的方式更適合這種隊列消費的模型。

    127.0.0.1:6379[3]> lpush queue 5 (integer) 1 127.0.0.1:6379[3]> lpush queue 1 (integer) 1 127.0.0.1:6379[3]> lpush queue 1

    可以看到結果

    Waiting 5 Waiting 1 Waiting 1 200 1 It works Done 1 200 1 It works Done 1 200 5 It works Done 5

    協程消費

    主線程用于監聽隊列,然后子線程的做事件循環的worker是一種方式。還有一種方式實現這種類似master-worker的方案。即把監聽隊列的無限循環邏輯一道協程中。程序初始化就創建若干個協程,實現類似并行的效果。

    import time import asyncio import redisnow = lambda : time.time()def get_redis():connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.Redis(connection_pool=connection_pool)rcon = get_redis()async def worker():print('Start worker')while True:start = now()task = rcon.rpop("queue")if not task:await asyncio.sleep(1)continueprint('Wait ', int(task))await asyncio.sleep(int(task))print('Done ', task, now() - start)def main():asyncio.ensure_future(worker())asyncio.ensure_future(worker())loop = asyncio.get_event_loop()try:loop.run_forever()except KeyboardInterrupt as e:print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())loop.stop()loop.run_forever()finally:loop.close()if __name__ == '__main__':main()

    這樣做就可以多多啟動幾個worker來監聽隊列。一樣可以到達效果。

    總結

    上述簡單的介紹了asyncio的用法,主要是理解事件循環,協程和任務,future的關系。異步編程不同于常見的同步編程,設計程序的執行流的時候,需要特別的注意。畢竟這和以往的編碼經驗有點不一樣。可是仔細想想,我們平時處事的時候,大腦會自然而然的實現異步協程。比如等待煮茶的時候,可以多寫幾行代碼。

    相關代碼文件的?Gist

    參考:Threaded Asynchronous Magic and How to Wield It

    示例:

    參考:Python 中的并發處理之 asyncio 包使用的詳解:https://www.jb51.net/article/137681.htm

    import asyncio import itertools import sysasync def spin(msg):for char in itertools.cycle('|/-\\'):status = char + ' ' + msgprint(status)try:# 使用 await asyncio.sleep(.1) 代替 time.sleep(.1),這樣的休眠不會阻塞事件循環。await asyncio.sleep(.1)except asyncio.CancelledError:# 如果 spin 函數蘇醒后拋出 asyncio.CancelledError 異常,其原因是發出了取消請求,因此退出循環。breakasync def slow_function():# 假裝等待I/O一段時間# await asyncio.sleep(3) 表達式把控制權交給主循環,在休眠結束后恢復這個協程。await asyncio.sleep(3)return 42async def supervisor():# asyncio.ensure_future(...) 函數排定 spin 協程的運行時間,使用一個 Task 對象包裝 spin 協程,并立即返回。spinner = asyncio.ensure_future(spin('thinking!'))print('spinner object:', spinner)t_result = await slow_function() # 驅動 slow_function() 函數。結束后,獲取返回值。# 同時,事件循環繼續運行,因為slow_function 函數最后使用 await asyncio.sleep(3) 表達式把控制權交回給了主循環。# Task 對象可以取消;取消后會在協程當前暫停的 yield 處拋出 asyncio.CancelledError 異常。# 協程可以捕獲這個異常,也可以延遲取消,甚至拒絕取消。spinner.cancel()return t_resultif __name__ == '__main__':loop = asyncio.get_event_loop() # 獲取事件循環的引用# 驅動 supervisor 協程,讓它運行完畢;這個協程的返回值是這次調用的返回值。result = loop.run_until_complete(supervisor())loop.close()print('Answer:', result)

    二、避免阻塞型調用

    1、有兩種方法能避免阻塞型調用中止整個應用程序的進程:

  • 在單獨的線程中運行各個阻塞型操作。
  • 把每個阻塞型操作轉換成非阻塞的異步調用。
  • 使用多線程處理大量連接時將耗費過多的內存,故此通常使用回調來實現異步調用。

    2、使用Executor對象防止阻塞事件循環:

    使用 loop.run_in_executor 把阻塞的作業(例如保存文件)委托給線程池做。

    示例:

    import asyncio import timeasync def shop(delay, what):print(what)await asyncio.sleep(delay)print(what,"...出來了")async def main():task1 = asyncio.create_task(shop(8, '女朋友看衣服..'))task2 = asyncio.create_task(shop(5, '體驗手機..'))print(time.ctime(), "開始逛街")await task1await task2print(time.ctime(), "結束.")asyncio.run(main())

    Python --- aiohttp 的使用

    參考:?https://www.cnblogs.com/ssyfj/p/9222342.html

    1. aiohttp 的簡單使用(配合asyncio模塊)

    import asyncio import aiohttpasync def fetch_async(url):print(url)async with aiohttp.request("GET", url) as r:# 或者直接 await r.read()不編碼,直接讀取,適合于圖像等無法編碼文件resp = await r.text(encoding="utf-8")print(resp)print('*' * 50)tasks = [fetch_async('https://www.baidu.com/'),fetch_async('https://www.chouti.com/') ]if __name__ == '__main__':event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

    2. 發起一個 session 請求

    import asyncio import aiohttpasync def fetch_async(url):print(url)async with aiohttp.ClientSession() as session: # 協程嵌套,只需要處理最外層協程即可fetch_asyncasync with session.get(url) as resp:print(resp.status)# 因為這里使用到了 await 關鍵字,實現異步,所有他上面的函數體需要聲明為異步asyncprint(await resp.text())print('*' * 50)if __name__ == '__main__':tasks = [fetch_async('https://www.baidu.com/'),fetch_async('https://www.cnblogs.com/ssyfj/')]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

    除了上面的 get 方法外,會話還支持 post,put,delete .... 等

    session.put('http://httpbin.org/put', data=b'data') session.delete('http://httpbin.org/delete') session.head('http://httpbin.org/get') session.options('http://httpbin.org/get') session.patch('http://httpbin.org/patch', data=b'data')

    不要為每次的連接都創建一次session,一般情況下只需要創建一個session,然后使用這個session執行所有的請求。

    每個session對象,內部包含了一個連接池,并且將會保持連接和連接復用(默認開啟)可以加快整體的性能。

    3. 在 url 中傳遞參數(其實與 requests 模塊使用大致相同)

    只需要將參數字典,傳入 params 參數中即可

    import asyncio import aiohttp from lxml import etreeasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)# print(await r.read())ss = etree.HTML(text=await r.text())movie_name = ss.xpath('//ol[@class="grid_view"]//div[@class="hd"]//a//span[1]//text()')print(f'len: {len(list(map(lambda i=None: print(i), movie_name)))}')if __name__ == '__main__':# https://movie.douban.com/top250?start=50tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

    4. 獲取響應內容(由于獲取響應內容是一個阻塞耗時過程,所以我們使用await實現協程切換)

    (1)使用 text() 方法

    import asyncio import aiohttp from lxml import etreeasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)print(r.charset) # 查看默認編碼為 utf-8print(await r.text()) # 不編碼使用默認編碼,也可以使用 encoding 指定編碼if __name__ == '__main__':# https://movie.douban.com/top250?start=50tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

    (2)使用 read() 方法,不進行編碼,為字節形式

    import asyncio import aiohttp from lxml import etreeasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)print(await r.read())if __name__ == '__main__':# https://movie.douban.com/top250?start=50tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

    (3)注意:text()、read() 方法是把整個響應體讀入內存,如果你是獲取大量的數據,請考慮使用 "字節流"(StreamResponse)

    5. 特殊響應內容 json(和上面一樣)

    import asyncio import aiohttpasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)print(r.charset)# print(await r.json()) # 可以設置編碼,設置處理函數print(await r.read())if __name__ == '__main__':# https://movie.douban.com/top250?start=100tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

    6. 字節流形式獲取數據(不像 text、read 一次獲取所有數據)

    注意:我們獲取的 session.get() 是 Response 對象,他繼承于 StreamResponse

    import asyncio import aiohttpasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(await r.content.read(10)) # 讀取前10字節if __name__ == '__main__':# https://movie.douban.com/top250?start=100tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

    下面字節流形式讀取數據,保存文件

    import asyncio import aiohttpasync def func1(url, params, filename):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:with open(filename, "wb") as fp:while True:chunk = await r.content.read(10)if not chunk:breakfp.write(chunk)if __name__ == '__main__':# https://movie.douban.com/top250?start=100tasks = [func1('https://movie.douban.com/top250', {"start": 100}, "1.html"), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

    注意:

    async with session.get(url,params=params) as r:  # 異步上下文管理器with open(filename,"wb") as fp:   # 普通上下文管理器

    兩者的區別:在于異步上下文管理器中定義了?__aenter__和__aexit__方法

    異步上下文管理器指的是在enter和exit方法處能夠暫停執行的上下文管理器

    ?為了實現這樣的功能,需要加入兩個新的方法:__aenter__?和__aexit__。這兩個方法都要返回一個 awaitable類型的值。

    推文:異步上下文管理器async with和異步迭代器async for

    7. 自定義請求頭(和 requests 一樣)

    import asyncio import aiohttpasync def func1(url, params, filename):async with aiohttp.ClientSession() as session:headers = {'Content-Type': 'text/html; charset=utf-8'}async with session.get(url, params=params, headers=headers) as r:with open(filename, "wb") as fp:while True:chunk = await r.content.read(10)if not chunk:breakfp.write(chunk)if __name__ == '__main__':# https://movie.douban.com/top250?start=100tasks = [func1('https://movie.douban.com/top250', {"start": 100}, "1.html"), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

    8. 自定義 cookie

    注意:對于自定義cookie,我們需要設置在 ClientSession(cookies=自定義cookie字典),而不是session.get()中

    class ClientSession:def __init__(self, *, connector=None, loop=None, cookies=None,headers=None, skip_auto_headers=None,auth=None, json_serialize=json.dumps,request_class=ClientRequest, response_class=ClientResponse,ws_response_class=ClientWebSocketResponse,version=http.HttpVersion11,cookie_jar=None, connector_owner=True, raise_for_status=False,read_timeout=sentinel, conn_timeout=None,timeout=sentinel,auto_decompress=True, trust_env=False,trace_configs=None):

    使用:

    cookies = {'cookies_are': 'working'} async with ClientSession(cookies=cookies) as session:

    9. 獲取當前訪問網站的 cookie

    async with session.get(url) as resp:print(resp.cookies)

    10. 獲取網站的響應狀態碼

    async with session.get(url) as resp:print(resp.status)

    11. 查看響應頭

    resp.headers # 來查看響應頭,得到的值類型是一個dict: resp.raw_headers   # 查看原生的響應頭,字節類型

    12. 查看重定向的響應頭(我們此時已經到了新的網址,向之前的網址查看)

    resp.history  # 查看被重定向之前的響應頭

    13. 超時處理

    默認的IO操作都有5分鐘的響應時間 我們可以通過 timeout 進行重寫:

    async with session.get('https://github.com', timeout=60) as r:...

    如果 timeout=None 或者 timeout=0 將不進行超時檢查,也就是不限時長。

    14. ClientSession 用于在多個連接之間(同一網站)共享cookie,請求頭等

    import asyncio import aiohttpasync def func1():cookies = {'my_cookie': "my_value"}async with aiohttp.ClientSession(cookies=cookies) as session:async with session.get("https://segmentfault.com/q/1010000007987098") as r:print(session.cookie_jar.filter_cookies("https://segmentfault.com"))async with session.get("https://segmentfault.com/hottest") as rp:print(session.cookie_jar.filter_cookies("https://segmentfault.com"))if __name__ == '__main__':tasks = [func1(), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close() Set-Cookie: PHPSESSID=web2~d8grl63pegika2202s8184ct2q Set-Cookie: my_cookie=my_value Set-Cookie: PHPSESSID=web2~d8grl63pegika2202s8184ct2q Set-Cookie: my_cookie=my_value

    我們最好使用session.cookie_jar.filter_cookies()獲取網站cookie,不同于requests模塊,雖然我們可以使用rp.cookies有可能獲取到cookie,但似乎并未獲取到所有的cookies。

    import asyncio import aiohttpasync def func1():cookies = {'my_cookie': "my_value"}async with aiohttp.ClientSession(cookies=cookies) as session:async with session.get("https://segmentfault.com/q/1010000007987098") as rp_1:print('1' * 50)print(session.cookie_jar.filter_cookies("https://segmentfault.com"))# 首次訪問會獲取網站設置的 cookie# Set-Cookie: PHPSESSID=web2~jh3ouqoabvr4e72f87vtherkp6; Domain=segmentfault.com; Path=/ print('2' * 50)print(f'rp_1.cookies:{rp_1.cookies}')async with session.get("https://segmentfault.com/hottest") as rp_2:print('3' * 50)print(session.cookie_jar.filter_cookies("https://segmentfault.com"))print('4' * 50)print(f'rp_2.cookies:{rp_2.cookies}') # 為空,服務端未設置 cookieasync with session.get("https://segmentfault.com/newest") as rp_3:print('5' * 50)print(session.cookie_jar.filter_cookies("https://segmentfault.com"))print('6' * 50)print(f'rp_3.cookies:{rp_3.cookies}') # 為空,服務端未設置cookieif __name__ == '__main__':tasks = [func1(), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

    運行結果:

    11111111111111111111111111111111111111111111111111 Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9 Set-Cookie: my_cookie=my_value 22222222222222222222222222222222222222222222222222 rp_1.cookies:Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9; Domain=segmentfault.com; Path=/ 33333333333333333333333333333333333333333333333333 Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9 Set-Cookie: my_cookie=my_value 44444444444444444444444444444444444444444444444444 rp_2.cookies: 55555555555555555555555555555555555555555555555555 Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9 Set-Cookie: my_cookie=my_value 66666666666666666666666666666666666666666666666666 rp_3.cookies:

    總結:

    當我們使用 rp.cookie 時,只會獲取到當前 url 下設置的 cookie,不會維護整站的cookie而session.cookie_jar.filter_cookies("https://segmentfault.com")會一直保留這個網站的所有設置cookies,含有我們在會話時設置的cookie,并且會根據響應修改更新cookie。這個才是我們需要的而我們設置cookie,也是需要在aiohttp.ClientSession(cookies=cookies)中設置

    ClientSession 還支持 請求頭,keep-alive連接和連接池(connection pooling)

    15. cookie的安全性

    默認 ClientSession 使用的是嚴格模式的 aiohttp.CookieJar. RFC 2109,明確的禁止接受 url 和 ip 地址產生的 cookie,只能接受 DNS 解析 IP 產生的 cookie。可以通過設置 aiohttp.CookieJar 的 unsafe=True 來配置:

    jar = aiohttp.CookieJar(unsafe=True) session = aiohttp.ClientSession(cookie_jar=jar)

    16. 控制同時連接的數量(連接池)

    TCPConnector 維持鏈接池,限制并行連接的總量,當池滿了,有請求退出再加入新請求

    import asyncio import aiohttpasync def func1():cookies = {'my_cookie': "my_value"}conn = aiohttp.TCPConnector(limit=2) # 默認100,0表示無限async with aiohttp.ClientSession(cookies=cookies, connector=conn) as session:for i in range(7, 35):url = "https://www.ckook.com/list-%s-1.html" % iasync with session.get(url) as rp:print('---------------------------------')print(rp.status)if __name__ == '__main__':tasks = [func1(), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

    限制同時打開連接到同一端點的數量( (host, port, is_ssl) 三的倍數),可以通過設置 limit_per_host 參數:

    limit_per_host: 同一端點的最大連接數量。同一端點即(host, port, is_ssl)完全相同

    conn = aiohttp.TCPConnector(limit_per_host=30)#默認是0

    在協程下測試效果不明顯

    17. 自定義域名解析地址

    我們可以指定域名服務器的 IP 對我們提供的get或post的url進行解析:

    from aiohttp.resolver import AsyncResolverresolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"]) conn = aiohttp.TCPConnector(resolver=resolver)

    18. 設置代理

    aiohttp支持使用代理來訪問網頁:

    async with aiohttp.ClientSession() as session:async with session.get("http://python.org", proxy="http://some.proxy.com") as resp:print(resp.status)

    當然也支持需要授權的頁面:

    async with aiohttp.ClientSession() as session:proxy_auth = aiohttp.BasicAuth('user', 'pass') # 用戶,密碼async with session.get("http://python.org", proxy="http://some.proxy.com", proxy_auth=proxy_auth) as resp:print(resp.status)

    或者通過這種方式來驗證授權:

    session.get("http://python.org", proxy="http://user:pass@some.proxy.com")

    19. post傳遞數據的方法

    (1)模擬表單

    payload = {'key1': 'value1', 'key2': 'value2'} async with session.post('http://httpbin.org/post', data=payload) as resp:print(await resp.text())

    注意:data=dict 的方式 post 的數據將被轉碼,和 form 提交數據是一樣的作用,如果你不想被轉碼,可以直接以字符串的形式 data=str 提交,這樣就不會被轉碼。

    (2)post json

    payload = {'some': 'data'}async with session.post(url, data=json.dumps(payload)) as resp:

    其實json.dumps(payload)返回的也是一個字符串,只不過這個字符串可以被識別為json格式

    (3)post 小文件

    url = 'http://httpbin.org/post' files = {'file': open('report.xls', 'rb')}await session.post(url, data=files) url = 'http://httpbin.org/post' data = FormData() data.add_field('file', open('report.xls', 'rb'), filename='report.xls', content_type='application/vnd.ms-excel')await session.post(url, data=data)

    如果將文件對象設置為數據參數,aiohttp將自動以字節流的形式發送給服務器。

    (4)post 大文件

    aiohttp支持多種類型的文件以流媒體的形式上傳,所以我們可以在文件未讀入內存的情況下發送大文件。

    @aiohttp.streamer def file_sender(writer, file_name=None):with open(file_name, 'rb') as f:chunk = f.read(2 ** 16)while chunk:yield from writer.write(chunk)chunk = f.read(2 ** 16)# Then you can use `file_sender` as a data provider:async with session.post('http://httpbin.org/post', data=file_sender(file_name='huge_file')) as resp:print(await resp.text())

    (5)從一個url獲取文件后,直接post給另一個url

    r = await session.get('http://python.org') await session.post('http://httpbin.org/post',data=r.content)

    (6)post 預壓縮數據

    在通過aiohttp發送前就已經壓縮的數據, 調用壓縮函數的函數名(通常是deflate 或 zlib)作為content-encoding的值:

    import zlibasync def my_coroutine(session, headers, my_data):data = zlib.compress(my_data)headers = {'Content-Encoding': 'deflate'}async with session.post('http://httpbin.org/post', data=data, headers=headers):pass

    Python 協程爬蟲? ---? aiohttp + aiomultiprocess 使用

    aiohttp 是基于 asyncio 的一個異步http客戶端和服務器

    官方文檔:https://aiohttp.readthedocs.io/en/stable/client_quickstart.html

    aiomultiprocess :https://github.com/omnilib/aiomultiprocess

    簡單實用例子

    async def funct(index): print("start ", index) async with aiohttp.ClientSession() as session: async with session.get("https://movie.douban.com/top250?start=0", timeout=5) as resp: print(resp.status)print(await resp.text()) print("end ", index)

    aiohttp.ClientSession()

    創建會話,session 提供了各種請求方法,如 get、post、delete、put 等。認識新的關鍵字 async with,因為是協程的上下文管理,所以多了async關鍵字。這個不是強制使用的,你也可以自己手動關閉會話,但是一定要記得關閉

    注意:

    • 1、不要為每個請求創建會話。每個應用程序很可能需要一個會話來執行所有請求。
    • 2、aiohttp?在發送請求之前在內部執行 URL?規范化。要禁用規范化,請使用?encoded=True?參數進行URL構建

    獲取響應信息

    resp.status? # 狀態碼 await resp.text()? # 獲取響應正文,可以指定編碼 await resp.read()? # 讀取二進制響應內容 await resp.json()? # 獲取json響應內容 await resp.content.read(size)?# 讀取流

    注意事項:aiohttp 是在 await resp.text() 之后才發起請求的,所以必須調用之后才能獲取響應的內容,不然會出現異常 aiohttp.client_exceptions.ClientConnectionError: Connection closed

    aiomultiprocess

    asyncio 和多處理本身是有用的,但有局限性:asyncio 仍然不能超過 GIL 的速度,并且多處理一次只能處理一個任務。但是,他們在一起可以充分實現自己的真正潛力。

    aiomultiprocess 提供了一個簡單的界面,同時在每個子進程上運行完整的 asyncio 事件循環,從而實現了 Python 應用程序從未有過的并發級別。每個子進程可以一次執行多個協程,僅受工作量和可用內核數限制。

    注:aiomultiprocess 需要 Python 3.6 或更高版本

    import asyncio from aiohttp import request from aiomultiprocess import Poolasync def get(url):async with request("GET", url) as response:return await response.text("utf-8")async def main():urls = ["https://jreese.sh", ...]async with Pool() as pool:async for result in pool.map(get, urls):... # process resultif __name__ == '__main__':# Python 3.7asyncio.run(main())# Python 3.6# loop = asyncio.get_event_loop()# loop.run_until_complete(main())

    用法:在子進程中執行協程

    import asyncio from aiohttp import request from aiomultiprocess import Processasync def put(url, params):async with request("PUT", url, params=params) as response:passasync def main():p = Process(target=put, args=("https://jreese.sh", {}))await pif __name__ == "__main__":asyncio.run(main())

    如果您想從協程中獲取結果?Worker,請使用以下方法:

    import asyncio from aiohttp import request from aiomultiprocess import Workerasync def get(url):async with request("GET", url) as response:return await response.text("utf-8")async def main():p = Worker(target=get, args=("https://jreese.sh",))response = await pif __name__ == "__main__":asyncio.run(main())

    如果您需要一個托管的工作進程池,請使用?Pool:

    import asyncio from aiohttp import request from aiomultiprocess import Poolasync def get(url):async with request("GET", url) as response:return await response.text("utf-8")async def main():urls = ["https://jreese.sh", ...]async with Pool() as pool:result = await pool.map(get, urls)if __name__ == "__main__":asyncio.run(main())

    示例:

    import time import json import datetime import asyncio import hashlib from aiomultiprocess import Pool from redis import * from pybloom_live import BloomFilter import aiohttp# # Public variable #Bloom_data = BloomFilter(1000000000, 0.01) DB_get_question = StrictRedis(host='62.234.9.254', port=6480, password='lingmiao2015', db=4 ) pipeline_redis = DB_get_question.pipeline()# # Public functions #def md5(data):"""對數據進行MD5加密:param data::return:"""md5_qa = hashlib.md5(data.encode('utf8')).hexdigest()md5_qa = bytes(md5_qa, encoding='utf8')return md5_qaasync def get(data):"""協程函數:param url::return:"""# while True:# print('data:',data)# try:url = ''async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:get_proxy = DB_get_question.spop('IP_PROXY')response = await session.post(url, json=data, timeout=7, proxy={"http": "http://{}".format(get_proxy)})result = await response.text()hjson = json.loads(result)content = hjson['results'][0]['values']['text']# print('data:',data)print('\033[32;1mget_question\033[0m:', content)await asyncio.sleep(0.1)return content# except:# open('error_url.txt','a').write(url + '\n')# await get(data)async def request():"""使用進程加異步協程發送請求:return:"""key_number = 0datas = ['']split_key = DB_get_question.spop('key2_set').decode('utf8').split(': ')key = split_key[-1].replace('\'', '').replace('}', '')phone = split_key[0].replace('\'', '').replace('{', '').replace('b', '')while len(datas) != 0:key_number += 1if len(datas) > 1:async with Pool() as pool:get_proxy = DB_get_question.spop('IP_PROXY')result_list = await pool.map(get, datas)# print(result_list)for result in result_list:if result:# print('key',key)# print('phone', phone)if '請求次數' in result or 'key不對' in result or '請求內容為空' in result:split_key = DB_get_question.spop('key2_set').decode('utf8').split(': ')key = split_key[-1].replace('\'', '').replace('}', '')phone = split_key[0].replace('\'', '').replace('{', '')breakmd5_qa = md5(result)if md5_qa not in Bloom_data:Bloom_data.add(md5_qa)# pipeline_redis.lpush('total_question_list', result)pipeline_redis.sadd('get_question', result)pipeline_redis.execute()datas.clear()question_number = 0while True:question_number += 1pipeline_redis.spop('original_question_set')if question_number == 100:question_list = pipeline_redis.execute()breakdatas = {}print('datas', datas)print(datas)if key_number == 500:split_key = DB_get_question.spop('key2_set').decode('utf8').split(': ')key = split_key[-1].replace('\'', '').replace('}', '')phone = split_key[0].replace('\'', '').replace('{', '')key2_set_number = DB_get_question.scard('key2_set')if key2_set_number < 5:with open('key2_total.txt', 'r') as f_key:for key in f_key:key = key.strip()pipeline_redis.sadd('key2_set', key)pipeline_redis.execute()key_number = 0coroutine = request() task = asyncio.ensure_future(coroutine) loop = asyncio.get_event_loop() loop.run_until_complete(task)

    基于 asyncio、aiohttp、xpath 的異步爬蟲

    參看:https://blog.csdn.net/weixin_34290390/article/details/88772610

    # asyncio爬蟲, 去重, 入庫import asyncio import re import aiohttp import aiomysql from pyquery import PyQuerystopping = Falsestart_url = 'http://www.jobbole.com' waiting_urls = [] seen_urls = set() # 實際使用爬蟲去重時,數量過多,需要使用布隆過濾器async def fetch(url, session):async with aiohttp.ClientSession() as session:try:async with session.get(url) as resp:print('url status: {}'.format(resp.status))if resp.status in [200, 201]:data = await resp.text()return dataexcept Exception as e:print(e)def extract_urls(html): # html中提取所有urlurls = []pq = PyQuery(html)for link in pq.items('a'):url = link.attr('href')if url and url.startwith('http') and url not in seen_urls:urls.append(url)waiting_urls.append(urls)return urlsasync def init_urls(url, session):html = await fetch(url, session)seen_urls.add(url)extract_urls(html)async def article_handler(url, session, pool): # 獲取文章詳情并解析入庫html = await fetch(url, session)extract_urls(html)pq = PyQuery(html)title = pq('title').text() # 為了簡單, 只獲取title的內容async with pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute('SELECT 42;')insert_sql = "insert into article_test(title) values('{}')".format(title)await cur.execute(insert_sql) # 插入數據庫# print(cur.description)# (r,) = await cur.fetchone()# assert r == 42async def consumer(pool):async with aiohttp.ClientSession() as session:while not stopping:if len(waiting_urls) == 0: # 如果使用asyncio.Queue的話, 不需要我們來處理這些邏輯。await asyncio.sleep(0.5)continueurl = waiting_urls.pop()print('start get url:{}'.format(url))if re.match(r'http://.*?jobbole.com/\d+/', url):if url not in seen_urls: # 是沒有處理過的url,則處理asyncio.ensure_future(article_handler(url, sssion, pool))else:if url not in seen_urls:asyncio.ensure_future(init_urls(url))async def main(loop):# 等待mysql連接建立好pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='',db='aiomysql_test', loop=loop, charset='utf8', autocommit=True)# charset autocommit必須設置, 這是坑, 不寫數據庫寫入不了中文數據async with aiohttp.ClientSession() as session:html = await fetch(start_url, session)seen_urls.add(start_url)extract_urls(html)asyncio.ensure_future(consumer(pool))if __name__ == '__main__':event_loop = asyncio.get_event_loop()asyncio.ensure_future(main(event_loop))event_loop.run_forever()

    改? ? 進( 生產者 --- 消費者 從 redis 取數據?)

    config.py

    import os import syssys.path.append(os.getcwd()) sys.path.append("..") sys.path.append(os.path.abspath("../../"))DEV_OR_PRD = 'dev' REDIS_CONFIG = Noneif 'dev' == DEV_OR_PRD.lower():REDIS_CONFIG = {'host': '127.0.0.1','port': 6379,'db': 0,'password': None}pass elif 'prd' == DEV_OR_PRD.lower():REDIS_CONFIG = {'host': '127.0.0.1','port': 6379,}pass

    produce.py

    import redis import json import requests from config import REDIS_CONFIGdef add_task():payload = {}headers = {'Host': 'www.kuwo.cn','csrf': 'E7D1IGX45D','Cookie': 'kw_token=E7D1IGX45D; kw_token=AKH1VOZ2767'}queue_name = 'redis_list:test'redis_conn = redis.StrictRedis(**REDIS_CONFIG)for _ in range(100):for page_num in range(1, 11):url = f"http://www.kuwo.cn/api/www/bang/bang/musicList?bangId=16&pn={page_num}&rn=30"response = requests.request("GET", url, headers=headers, data=payload)try:music_list = response.json()['data']['musicList']except BaseException as e:music_list = []for item in music_list:task = {'crawl_url': item['pic'],'song_name': item['name']}redis_conn.lpush(queue_name, json.dumps(task, ensure_ascii=False))print(f'task:{task}')print(f'page {page_num} end')if __name__ == '__main__':add_task()pass

    consumer.py

    import re import asyncio import aiohttp import redis import json import datetime from config import REDIS_CONFIGmax_worker = 50 current_worker = 0def write_file(future=None):resp_data = future.result()if not resp_data:returntime_int = int(datetime.datetime.now().timestamp() * 1000)with open(f'./img/{time_int}.jpg', 'wb') as f:f.write(resp_data)async def fetch(url, session):global current_workercurrent_worker += 1try:async with session.get(url) as resp:print('url status: {}'.format(resp.status))if resp.status in [200, 201]:# data = await resp.text()data = await resp.read()current_worker -= 1return dataexcept Exception as e:print(e)return Noneasync def consumer(redis_conn=None, queue_name=None):async with aiohttp.ClientSession() as session:while True:task_string = redis_conn.rpop(queue_name)cha = current_worker - max_workerif cha >= 0 or not task_string:print('超過最大worker, 或者任務為空, 睡1秒繼續。。。')await asyncio.sleep(1)continuetask_string = task_string.decode('utf-8')task_dict = json.loads(task_string)crawl_url = task_dict['crawl_url']asyncio_task = asyncio.ensure_future(fetch(crawl_url, session))asyncio_task.add_done_callback(write_file)# await asyncio.sleep(0.001)def main():queue_name = 'redis_list:test'redis_conn_pool = redis.ConnectionPool(**REDIS_CONFIG)redis_conn = redis.StrictRedis(connection_pool=redis_conn_pool)event_loop = asyncio.get_event_loop()try:event_loop.run_until_complete(consumer(redis_conn=redis_conn, queue_name=queue_name))except KeyboardInterrupt as e:event_loop.stop()event_loop.run_forever()finally:event_loop.close()redis_conn.close()if __name__ == '__main__':main()

    2. asyncio + aiohttp

    import aiohttp import asyncioasync def fetch_async(url):print(url)async with aiohttp.request("GET", url) as r:response = await r.text(encoding="utf-8")# 或者直接await r.read()不編碼,直接讀取,適合于圖像等無法編碼文件# data = await r.read()# print(url, data)print(url, r.status)print(url, response)if __name__ == '__main__':tasks = [fetch_async('https://www.baidu.com/'),fetch_async('https://www.chouti.com/')]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

    示例:

    參考:深入理解協程(四):async/await異步爬蟲實戰:https://www.cnblogs.com/ghostlee/p/12208564.html

    import asyncio import time import aiohttp from lxml import etreeurls = ['https://blog.csdn.net/Jmilk/article/details/103218919','https://blog.csdn.net/stven_king/article/details/103256724','https://blog.csdn.net/csdnnews/article/details/103154693','https://blog.csdn.net/dg_lee/article/details/103951021','https://blog.csdn.net/m0_37907797/article/details/103272967','https://blog.csdn.net/zzq900503/article/details/49618605','https://blog.csdn.net/weixin_44339238/article/details/103977138','https://blog.csdn.net/dengjin20104042056/article/details/103930275','https://blog.csdn.net/Mind_programmonkey/article/details/103940511','https://blog.csdn.net/xufive/article/details/102993570','https://blog.csdn.net/weixin_41010294/article/details/104009722','https://blog.csdn.net/yunqiinsight/article/details/103137022','https://blog.csdn.net/qq_44210563/article/details/102826406', ]async def async_get_url(url):headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ''(KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36'}async with aiohttp.ClientSession() as session: # 解釋1async with session.get(url, headers=headers) as r:html = await r.read()try:title = etree.HTML(html).xpath('//h1[@class="title-article"]/text()')[0]print(title)except IndexError:print(f'Fail URL: {r.url}')def async_main():loop = asyncio.get_event_loop()tasks = [async_get_url(url) for url in urls]loop.run_until_complete(asyncio.wait(tasks))# loop.close()if __name__ == '__main__':start = time.time()async_main()print(f'cost time: {time.time() - start}s')

    運行結果:

    Fail URL: https://blog.csdn.net/weixin_44339238/article/details/103977138 網頁實現一個簡單的音樂播放器(大佬別看。(⊙﹏⊙)) AES中ECB模式的加密與解密(Python3.7) 【程序人生】程序員接私活常用平臺匯總 OOM別慌,手把手教你定位 致 Python 初學者 【圖解算法面試】記一次面試:說說游戲中的敏感詞過濾是如何實現的? 8年經驗面試官詳解 Java 面試秘訣 4G EPS 的網絡協議棧 【歷史總結】Android-Universal-Image-Loader源碼分析 你不得不了解的卷積神經網絡發展史 java進階(四)------java編程規范---代碼質量檢測工具FindBugs、PMD和CheckStyle的安裝 中國數據庫OceanBase登頂之路 cost time: 0.5409884452819824s

    解釋1:此處為異步的上下文管理器,是aiohttp官方文檔提供的寫法。如果對上下文管理器不是很了解的話,可以參看【吃透Python上下文管理器】。

    用時:0.5409884452819824s。從兩種爬蟲的輸出結果中可以看到:

    • 文章標題的順序不同。同步爬蟲會按照urls內部的url順序依次爬取文章標題。而異步爬蟲爬取的順序并不完全和urls中的url順序相同。
    • 爬取速度差異很大。異步爬蟲速度大概是普通同步爬蟲的8~10倍。異步爬蟲充分利用了網絡請求這段時間。從而提高了爬取效率。

    關于?aiohttp?的更多用法,推薦閱讀

    深入理解協程(一):協程的引入

    深入理解協程(二):yield from實現異步協程

    深入理解協程(三):async/await實現異步協程

    Python進階:上下文管理器

    3. asyncio + requests?

    import asyncio import requestsasync def fetch_async(func, *args, **kwargs):inner_loop = asyncio.get_event_loop()future = inner_loop.run_in_executor(None, func, *args)response = await futureprint(response.url, response.content)if __name__ == '__main__':tasks = [fetch_async(requests.get, 'https://www.cnblogs.com/wupeiqi/'),fetch_async(requests.get, 'https://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')]loop = asyncio.get_event_loop()results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()

    示例:

    import functools # at the top with the other imports import asyncio import requests import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)async def fetch_async(lp, func, *args, **kwargs):# inner_loop = asyncio.get_event_loop()# future = inner_loop.run_in_executor(None, func, *args)print(f'*args: {args}')future = lp.run_in_executor(None, functools.partial(func, *args, **kwargs))response = await futureprint(response.url, response.content)if __name__ == '__main__':loop = asyncio.get_event_loop()tasks = [fetch_async(loop, requests.get, 'https://www.cnblogs.com/wupeiqi/', verify=False),fetch_async(loop, requests.get, 'https://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')]results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()

    示例:

    task 協程Future 橋梁

    import requests import asyncio from concurrent import futures''' 使用多線程:在協程中集成阻塞io原型 :awaitable loop.run_in_executor(executor, func, *args) 參數 : executor 可以是 ThreadPoolExecutor / ProcessPool , 如果是None 則使用默認線程池 可使用 yield from 或 await 掛起函數 作用 : 例如在異步事件循環中的寫文件操作(或者其他IO操作), 這種慢的操作交給線程去做 '''def get_url(r_url=None):r = requests.get(url=r_url, verify=False)print(r.json())if __name__ == "__main__":loop = asyncio.get_event_loop()executor = futures.ThreadPoolExecutor(3)tasks = []for index in range(20):url = f"http://shop.projectsedu.com/goods/{index}/"task = loop.run_in_executor(executor, get_url, url)tasks.append(task)loop.run_until_complete(asyncio.wait(tasks))

    示例:

    import asyncio from concurrent import futuresdef block_func():with open("c:/test.txt", 'rb') as fd:return fd.read(500)async def todo(lp: asyncio.AbstractEventLoop):reader = await lp.run_in_executor(None, block_func) # 默認線程池print("reader:", reader)with futures.ThreadPoolExecutor() as ex:reader = await lp.run_in_executor(ex, block_func) # 自己創建一個線程池讓事件循環調用print("reader :", reader)loop = asyncio.get_event_loop() loop.run_until_complete(todo(loop))

    4. gevent + requests

    from gevent import monkey monkey.patch_all()import gevent import requests import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)def fetch_async(method, url, **req_kwargs):print(method, url, req_kwargs)response = requests.request(method=method, url=url, **req_kwargs)# print(f'{response.url}, {response.content}')print(f'{response.url}, {response.status_code}')def test_1():# ##### 發送請求 #####gevent.joinall([gevent.spawn(fetch_async, method='get', url='https://www.python.org/', verify=False),gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', verify=False),gevent.spawn(fetch_async, method='get', url='https://github.com/', verify=False),])def test_2():# #### 發送請求(協程池控制最大協程數量) #####from gevent.pool import Poolpool = Pool(None)gevent.joinall([pool.spawn(fetch_async, method='get', url='https://www.python.org/', verify=False),pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', verify=False),pool.spawn(fetch_async, method='get', url='https://www.github.com/', verify=False),])if __name__ == '__main__':# test_1()test_2()

    5. grequests

    import grequestsrequest_list = [grequests.get('http://httpbin.org/delay/1', timeout=0.001),grequests.get('http://fakedomain/'),grequests.get('http://httpbin.org/status/500') ]# ##### 執行并獲取響應列表 ##### # response_list = grequests.map(request_list) # print(response_list)# ##### 執行并獲取響應列表(處理異常) ##### def exception_handler(request, exception):print(request, exception)print("Request failed")response_list = grequests.map(request_list, exception_handler=exception_handler) print(response_list)

    6. Twisted 示例

    twisted學習筆記No.3 Web Clients:https://www.cnblogs.com/tracylining/p/3353808.html

    from twisted.web.client import Agent from twisted.web.client import defer from twisted.internet import reactordef all_done(arg):reactor.stop()def callback(contents):print(contents)deferred_list = []url_list = ['http://www.bing.com', 'http://www.baidu.com', ] for url in url_list:agent = Agent(reactor)d = agent.request(b'GET', url.encode('utf-8'))# d.addCallbacks(printResource, printError)d.addCallback(callback)deferred_list.append(d)d_list = defer.DeferredList(deferred_list) d_list.addBoth(all_done) reactor.run()

    7. Tornado

    from tornado.httpclient import AsyncHTTPClient from tornado.httpclient import HTTPRequest from tornado import ioloop from tornado.httpclient import HTTPResponsedef handle_response(response: HTTPResponse):"""處理返回值內容(需要維護計數器,來停止IO循環),調用 ioloop.IOLoop.current().stop():param response::return:"""if response.error:print("Error:", response.error)else:# print(response.body)print(f'{response.effective_url} : status_code : {response.code}')def func():url_list = ['https://www.baidu.com','https://www.bing.com',]for url in url_list:print(url)http_client = AsyncHTTPClient()http_client.fetch(HTTPRequest(url), handle_response)ioloop.IOLoop.current().add_callback(func) ioloop.IOLoop.current().start()

    8. Twisted 更多

    from twisted.internet import reactor from twisted.web.client import getPage from urllib import parsedef one_done(arg: bytes):print(arg.decode('utf-8'))reactor.stop()post_data = parse.urlencode({'check_data': 'adf'}) post_data = bytes(post_data, encoding='utf8') headers = {b'Content-Type': b'application/x-www-form-urlencoded'} response = getPage(bytes('http://dig.chouti.com/login', encoding='utf8'),method=bytes('POST', encoding='utf8'), postdata=post_data, cookies={}, headers=headers ) response.addBoth(one_done) reactor.run()

    以上均是 Python 內置以及第三方模塊提供異步IO請求模塊,使用簡便大大提高效率,而對于異步IO請求的本質則是【非阻塞Socket】+【IO多路復用】.

    9.史上最牛逼的異步IO模塊( select、poll、epoll?)

    select

    import select import socket import timeclass AsyncTimeoutException(TimeoutError):"""請求超時異常類"""def __init__(self, msg):self.msg = msgsuper(AsyncTimeoutException, self).__init__(msg)class HttpContext(object):"""封裝請求和相應的基本數據"""def __init__(self, sock, host, port, method, url, data, callback, timeout=5):"""sock: 請求的客戶端socket對象host: 請求的主機名port: 請求的端口port: 請求的端口method: 請求方式url: 請求的URLdata: 請求時請求體中的數據callback: 請求完成后的回調函數timeout: 請求的超時時間"""self.sock = sockself.callback = callbackself.host = hostself.port = portself.method = methodself.url = urlself.data = dataself.timeout = timeoutself.__start_time = time.time()self.__buffer = []def is_timeout(self):"""當前請求是否已經超時"""current_time = time.time()if (self.__start_time + self.timeout) < current_time:return Truedef fileno(self):"""請求sockect對象的文件描述符,用于select監聽"""return self.sock.fileno()def write(self, data):"""在buffer中寫入響應內容"""self.__buffer.append(data)def finish(self, exc=None):"""在buffer中寫入響應內容完成,執行請求的回調函數"""if not exc:response = b''.join(self.__buffer)self.callback(self, response, exc)else:self.callback(self, None, exc)def send_request_data(self):content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s""" % (self.method.upper(), self.url, self.host, self.data,)return content.encode(encoding='utf8')class AsyncRequest(object):def __init__(self):self.fds = []self.connections = []def add_request(self, host, port, method, url, data, callback, timeout):"""創建一個要請求"""client = socket.socket()client.setblocking(False)try:client.connect((host, port))except BlockingIOError as e:pass# print('已經向遠程發送連接的請求')req = HttpContext(client, host, port, method, url, data, callback, timeout)self.connections.append(req)self.fds.append(req)def check_conn_timeout(self):"""檢查所有的請求,是否有已經連接超時,如果有則終止"""timeout_list = []for context in self.connections:if context.is_timeout():timeout_list.append(context)for context in timeout_list:context.finish(AsyncTimeoutException('請求超時'))self.fds.remove(context)self.connections.remove(context)def running(self):"""事件循環,用于檢測請求的socket是否已經就緒,從而執行相關操作"""while True:r, w, e = select.select(self.fds, self.connections, self.fds, 0.05)if not self.fds:returnfor context in r:sock = context.sockwhile True:try:data = sock.recv(8096)if not data:self.fds.remove(context)context.finish()breakelse:context.write(data)except BlockingIOError as e:breakexcept TimeoutError as e:self.fds.remove(context)self.connections.remove(context)context.finish(e)breakfor context in w:# 已經連接成功遠程服務器,開始向遠程發送請求數據if context in self.fds:data = context.send_request_data()context.sock.sendall(data)self.connections.remove(context)self.check_conn_timeout()if __name__ == '__main__':def callback_func(context, response, ex):""":param context: HttpContext對象,內部封裝了請求相關信息:param response: 請求響應內容:param ex: 是否出現異常(如果有異常則值為異常對象;否則值為None):return:"""print(context, response, ex)obj = AsyncRequest()url_list = [{'host': 'www.google.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,'callback': callback_func},{'host': 'www.baidu.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,'callback': callback_func},{'host': 'www.bing.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,'callback': callback_func},]for item in url_list:print(item)obj.add_request(**item)obj.running()

    示例 2:

    import socket import selectclass HttpRequest:def __init__(self, sk, host, callback):self.socket = skself.host = hostself.callback = callbackdef fileno(self):return self.socket.fileno()class AsyncRequest:def __init__(self):self.conn = []self.connection = [] # 用于檢測是否已經連接成功def add_request(self, host, callback):try:sk = socket.socket()sk.setblocking(0)sk.connect((host, 80,))except BlockingIOError as e:passrequest = HttpRequest(sk, host, callback)self.conn.append(request)self.connection.append(request)def run(self):while True:rlist, wlist, elist = select.select(self.conn, self.connection, self.conn, 0.05)for w in wlist:print(w.host, '連接成功...')# 只要能循環到,表示socket和服務器端已經連接成功tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n" % (w.host,)w.socket.send(bytes(tpl, encoding='utf-8'))self.connection.remove(w)for r in rlist:# r,是HttpRequestrecv_data = bytes()while True:try:chunck = r.socket.recv(8096)recv_data += chunckexcept Exception as e:breakr.callback(recv_data)r.socket.close()self.conn.remove(r)if len(self.conn) == 0:breakdef f1(data):print('保存到文件', data)def f2(data):print('保存到數據庫', data)url_list = [{'host': 'www.baidu.com', 'callback': f1},{'host': 'cn.bing.com', 'callback': f2},{'host': 'www.cnblogs.com', 'callback': f2}, ]req = AsyncRequest() for item in url_list:req.add_request(item['host'], item['callback'])req.run()

    創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

    總結

    以上是生活随笔為你收集整理的【进阶】 --- 多线程、多进程、异步IO实用例子的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。