python grequests极限_Python grequests闲话
前段時間看到這個grequests庫,感覺還是蠻有意思的,所以今天來對這個庫拆解拆解。這個庫是崇拜的大神kennethreitz寫的。Github地址:https://github.com/kennethreitz/grequests
首先看到文檔上給的示例:
import grequests
urls = [
'http://www.heroku.com',
'http://python-tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://fakedomain/',
'http://kennethreitz.com'
]
# 創建沒有發送的request集合
rs = (grequests.get(u) for u in urls)
# 發送
grequests.map(rs)
# 為了防止超時和異常發生,可以指定一個異常處理器
def exception_handler(request, exception):
print("Request failed")
reqs = [
grequests.get('http://httpbin.org/delay/1', timeout=0.001),
grequests.get('http://fakedomain/'),
grequests.get('http://httpbin.org/status/500')]
grequests.map(reqs, exception_handler=exception_handler)
另外,可以使用imap來提高性能
根據這個示例,我們來看看源代碼
AsyncRequest
首先來看grequests.get:
get = partial(AsyncRequest, 'GET')
options = partial(AsyncRequest, 'OPTIONS')
它和其他諸多HTTP方法一樣,只是一個快捷方式,其本質是調用了AsyncRequest,看到這個名字就應該知道是異步的Request,所以應該是對普通的Request做了封裝和修改:
class AsyncRequest(object):
""" 異步的Request,接收和Session.request相同的參數,還有一些額外的參數
session: 發送請求的session
callback: 在返回對象上的回調函數,和傳遞hooks={'response': callback}一樣
"""
def __init__(self, method, url, **kwargs):
#: Request method
self.method = method
#: URL to request
self.url = url
#: Associated ``Session``
self.session = kwargs.pop('session', None)
if self.session is None:
# requests里的Session對象
self.session = Session()
callback = kwargs.pop('callback', None)
if callback:
kwargs['hooks'] = {'response': callback}
#: The rest arguments for ``Session.request``
self.kwargs = kwargs
#: Resulting `` Response``
self.response = None
可以看到使用partial(AsyncRequest, 'GET')會使得method默認為GET,然后再rs = (grequests.get(u) for u in urls)會形成一個生成器,里面生成AsyncRequest對象。下面來看看這些request是如何發送的:
map
之后程序會調用grequests.map(rs),那么我們來看看map:
def map(requests, stream=False, size=None, exception_handler=None, gtimeout=None):
"""并發的將Requests列表轉換成響應
requests: Request對象的集合
stream: 如果為True,那么響應內容不會立即下載
size: 指定同時發起的請求數目,如果為None,就不會有限制
exception_handler: 回調函數,當異常發生的時候調用,參數是Request和Exception
gtimeout: Gevent合并所有的超時時間,單位為秒(與每個request的超時時間無關)
"""
# 將生成器直接轉換成list
requests = list(requests)
# gevent的Pool對象,是時候研究一波gevent了
pool = Pool(size) if size else None
# 調用send函數來發送請求
jobs = [send(r, pool, stream=stream) for r in requests]
# 等待所有的greenlet處理單元結束運行
gevent.joinall(jobs, timeout=gtimeout)
ret = []
# 處理所有的請求響應,并且處理異常
for request in requests:
if request.response is not None:
ret.append(request.response)
# 如果有異常處理器并且request有異常進行處理
elif exception_handler and hasattr(request, 'exception'):
ret.append(exception_handler(request, request.exception))
else:
# 否則結果置為None
ret.append(None)
return ret
可以看到map函數很簡單,大體流程就是建立了greenlet處理器池,然后對每個request進行調用,然后等待結束,最后得到響應并且處理響應。
# 如果給定size則創建greenlet池,否則為None
pool = Pool(size) if size else None
jobs = [send(r, pool, stream=stream) for r in requests]
gevent.joinall(jobs, timeout=gtimeout)
所以問題的關鍵還是在于gevent那幾行的調用,創建管理greenlet的池,用來限制并發,創建好了之后,調用這個pool然后去發送請求,最后等待所有的greenlet結束。
Gevent的并發
無論創建還是沒有創建池,最終是要調用send方法的,來看看這個函數:
def send(r, pool=None, stream=False):
"""使用指定的pool發送request對象,如果pool沒有指定,這個方法就會阻塞,Pools很有用,因為你可以指定并發限制"""
if pool is not None:
return pool.spawn(r.send, stream=stream)
return gevent.spawn(r.send, stream=stream)
gevent.spawn
gevent.spawn創建一個新的Greenlet對象,并且排定運行調用function(*args, **kwargs),這個可以使用gevent.spawn或者是Greenlet.spawn,其實gevent.spawn就是Greenlet.spawn,并且最后會調用Greenlet的類方法,首先實例化一個對象,然后調用start方法,所以也相當于調用Greenlet(*args, **kwargs)
這也是類方法的一個用法,另外的實例化對象的方法
@classmethod
def spawn(cls, *args, **kwargs):
g = cls(*args, **kwargs)
g.start()
return g
pool.spawn
這個方法使用給定的參數開始一個新的greenlet,通常是傳遞給Greenlet構造函數,并且將其加入這個pool管理的greenlets集合
Pool是Group的子類,提供了限制并發的方法,其spawn方法在greenlets數目達到上限的時候阻塞,直到有一個可用的greenlet。
這個方法也是使用pool實例為Greenlet創建一個實例,然后start it
def spawn(self, *args, **kwargs):
greenlet = self.greenlet_class(*args, **kwargs)
self.start(greenlet)
return greenlet
r.send
這個是AsyncRequest的send方法,比較簡單,就是發送請求,等待響應。
def send(self, **kwargs):
merged_kwargs = {}
merged_kwargs.update(self.kwargs)
merged_kwargs.update(kwargs)
try:
self.response = self.session.request(self.method, self.url, **merged_kwargs)
except Exception as e:
self.exception = e
self.traceback = traceback.format_exc()
return self
imap
imap據說可以提高性能,快來看看吧:
def imap(requests, stream=False, size=2, exception_handler=None):
"""并發的將Request對象的生成器轉換成響應的生成器。
requests: Request對象的生成器
stream: 如果為True,則不會立即自動下載
size: 同時發起的請求數,默認為2
exception_handler: 當發生異常時候回調
"""
pool = Pool(size)
def send(r):
return r.send(stream=stream)
for request in pool.imap_unordered(send, requests):
if request.response is not None:
yield request.response
elif exception_handler:
exception_handler(request, request.exception)
pool.join()
可以看到這個函數主要是使用了pool.imap_unordered,其實pool還有一個方法是imap
pool.imap
和itertools.imap()是一致的,itertools.imap()可以用于迭代無窮序列,比如itertools.imap(lambda x, y: x * y, [10, 20, 30], itertools.count(1)),如果兩個序列長短不一致,以短的為準,并且imap實現了惰性計算,類似生成器。
pool.imap可以并行運行,按順序從迭代對象中取出元素迭代,應用在函數上,然后收集結果。
如果限制了可以同時進行的greenlets數量,那么最多只有這么多個任務同時進行。
pool.imap_unordered
和imap一樣,返回的結果順序是隨意的,比起imap更加輕量級,如果順序不重要的話,首先應該選用這個。
join
等待這個group的greenlets都運行完,如果這個group沒有greenlet的話,立即返回
可以看到,如果不要求順序的話,imap_unordered會比imap更加高效,同時imap版本肯定比map版本性能好,因為map版本必須全部運行完才能拿數據,但是imap版本只要有greenlet有結果就可以取出來。
小結
這個庫就這么多內容,其實主要是使用gevent封裝了一層requests,所以核心就是使用gevent,gevent怎么用,如何用,待我繼續研究。
不過通過看這個庫,也了解到了簡單的gevent的用法
總結
以上是生活随笔為你收集整理的python grequests极限_Python grequests闲话的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑键盘上的Alt键有何妙用
- 下一篇: python css selector_