多线程与多进程爬虫
多線程與多進程爬蟲
- threading
- Thread類與線程函數(shù)
- Thread 類與線程對象
- 從Tread 類繼承
- 線程鎖
- 信號量
- 信號量與鎖結(jié)合
- 生產(chǎn)者--消費者問題與queue模塊
- 爬取豆瓣電影詳情
- 網(wǎng)頁分析
threading
Thread類與線程函數(shù)
如果使用Thread類處理線程就方便得多了,可以直接使用Thread對象的join方法等待線程函數(shù)執(zhí)行完畢再往下執(zhí)行,也就是說,在主線程(main函數(shù))中調(diào)用Thread對象的join方法,并且Thread對象的線程函數(shù)沒有執(zhí)行完畢,主線程會處于阻塞狀態(tài)。
使用Thread類也很簡單,首先需要創(chuàng)建Thread類的實例,通過Thread類構(gòu)造方法的target關(guān)鍵字參數(shù)執(zhí)行線程函數(shù),通過args關(guān)鍵字參數(shù)指定傳給線程函數(shù)的參數(shù)。然后調(diào)用Thread對象的start方法啟動線程。
樣例:
import threading from time import sleep, ctime # 線程函數(shù),index表示整數(shù)類型的索引,sec表示休眠時間,單位:秒 def fun(index, sec):print('開始執(zhí)行', index, ' 時間:', ctime())# 休眠sec秒sleep(sec)print('結(jié)束執(zhí)行', index, '時間:', ctime()) def main():# 創(chuàng)建第1個Thread對象,通過target關(guān)鍵字參數(shù)指定線程函數(shù)fun,傳入索引10和休眠時間(4秒)thread1 = threading.Thread(target=fun,args=(10, 4))# 啟動第1個線程thread1.start()# 創(chuàng)建第2個Thread對象,通過target關(guān)鍵字參數(shù)指定線程函數(shù)fun,傳入索引20和休眠時間(2秒)thread2 = threading.Thread(target=fun,args=(20, 2))# 啟動第2個線程thread2.start()# 等待第1個線程函數(shù)執(zhí)行完畢thread1.join()# 等待第2個線程函數(shù)執(zhí)行完畢thread2.join()if __name__ == '__main__':main()Thread 類與線程對象
Thread類構(gòu)造方法的target關(guān)鍵字參數(shù)不僅可以是一個函數(shù),還可以是一個對象,可以稱這個對象為線程對象。其實線程調(diào)用的仍然是函數(shù),只是這個函數(shù)用對象進行了封裝。這么做的好處是可以將與線程函數(shù)相關(guān)的代碼都放在對象對應(yīng)的類中,這樣更能體現(xiàn)面向?qū)ο蟮姆庋b性。
線程對象對應(yīng)的類需要有一個可以傳入線程函數(shù)和參數(shù)的構(gòu)造方法,而且在類中還必須有一個名為“_ call _” 的方法。當(dāng)線程啟動時,會自動調(diào)用線程對象的“_ call _”方法,然后在該方法中調(diào)用線程函數(shù)。
代碼:
import threading from time import sleep, ctime # 線程對象對應(yīng)的類 class MyThread(object):# func表示線程函數(shù),args表示線程函數(shù)的參數(shù)def __init__(self, func, args):# 將線程函數(shù)與線程函數(shù)的參數(shù)賦給當(dāng)前類的成員變量self.func = funcself.args = args# 線程啟動時會調(diào)用該方法def __call__(self):# 調(diào)用線程函數(shù),并將元組類型的參數(shù)值分解為單個的參數(shù)值傳入線程函數(shù)self.func(*self.args) # 線程函數(shù) def fun(index, sec):print('開始執(zhí)行', index, ' 時間:', ctime())# 延遲sec秒sleep(sec)print('結(jié)束執(zhí)行', index, '時間:', ctime()) def main():print('執(zhí)行開始時間:', ctime())# 創(chuàng)建第1個線程,通過target關(guān)鍵字參數(shù)指定了線程對象(MyThread),延遲4秒thread1 = threading.Thread(target = MyThread(fun,(10, 4)))# 啟動第1個線程thread1.start()# 創(chuàng)建第2個線程,通過target關(guān)鍵字參數(shù)指定了線程對象(MyThread),延遲2秒thread2 = threading.Thread(target = MyThread(fun,(20, 2)))# 啟動第2個線程thread2.start()# 創(chuàng)建第3個線程,通過target關(guān)鍵字參數(shù)指定了線程對象(MyThread),延遲1秒thread3 = threading.Thread(target = MyThread(fun,(30, 1)))# 啟動第3個線程thread3.start()# 等待第1個線程函數(shù)執(zhí)行完畢thread1.join()# 等待第2個線程函數(shù)執(zhí)行完畢thread2.join()# 等待第3個線程函數(shù)執(zhí)行完畢thread3.join()print('所有的線程函數(shù)已經(jīng)執(zhí)行完畢:', ctime()) if __name__ == '__main__':main()從Tread 類繼承
為了更好地對與線程有關(guān)的代碼進行封裝,可以從Thread類派生一個子類。然后將與線程有關(guān)的代碼都放到這個類中。Thread類的子類的使用方法與Thread相同。從Thread類繼承最簡單的方式是在子類的構(gòu)造方法中通過
super( )函數(shù)調(diào)用父類的構(gòu)造方法,并傳入相應(yīng)的參數(shù)值。
示例:
import threading from time import sleep, ctime# 從Thread類派生的子類 class MyThread(threading.Thread):# 重寫父類的構(gòu)造方法,其中func是線程函數(shù),args是傳入線程函數(shù)的參數(shù),name是線程名def __init__(self, func, args, name=''):# 調(diào)用父類的構(gòu)造方法,并傳入相應(yīng)的參數(shù)值super().__init__(target=func, name=name,args=args)# 重寫父類的run方法def run(self):self._target(*self._args)# 線程函數(shù) def fun(index, sec):print('開始執(zhí)行', index, '時間:', ctime())# 休眠sec秒sleep(sec)print('執(zhí)行完畢', index, '時間:', ctime())def main():print('開始:', ctime())# 創(chuàng)建第1個線程,并指定線程名為“線程1”thread1 = MyThread(fun, (10, 4), '線程1')# 創(chuàng)建第2個線程,并指定線程名為“線程2”thread2 = MyThread(fun, (20, 2), '線程2')# 開啟第1個線程thread1.start()# 開啟第2個線程thread2.start()# 輸出第1個線程的名字print(thread1.name)# 輸出第2個線程的名字print(thread2.name)# 等待第1個線程結(jié)束thread1.join()# 等待第2個線程結(jié)束thread2.join()print('結(jié)束:', ctime())if __name__ == '__main__':main()線程鎖
線程鎖的目的是將一段代碼鎖住,一旦獲得了鎖權(quán)限,除非釋放線程鎖,否則其他任何代碼都無法再次獲得鎖權(quán)限。為了使用線程鎖,首先需要創(chuàng)建Lock類的實例,然后通過Lock對象的acquire方法獲取鎖權(quán)限,當(dāng)需要完成原子操作的代碼段執(zhí)行完后,再使用Lock對象的release方法釋放鎖,這樣其代碼就可以再次獲得這個鎖權(quán)限了。
要注意的是,鎖對象要放到線程函數(shù)的外面作為一個全局變量,這樣所有的線程函數(shù)實例都可以共享這個變量,如果將鎖對象放到線程函數(shù)內(nèi)部,那么這個鎖對象就變成局部變量了,多個線程函數(shù)實例使用的是不同的鎖對象,所以仍然不能有效保護原子操作的代碼。
示例:
from atexit import register import random from threading import Thread,Lock,currentThread from time import sleep,ctime#創(chuàng)建線程鎖對象 lock = Lock()def fun():#獲取線程鎖權(quán)限lock.acquire()#for循環(huán)已經(jīng)變成了原子操作for i in range(5):print('Thread Name','=',currentThread().name,'i','=',i)# 休眠一段時間1~4sleep(random.randint(1,5))#釋放線程鎖lock.release()def main():for i in range(3):Thread(target=fun).start()#當(dāng)線程結(jié)束時調(diào)用這個函數(shù) @register #路由def exit():print('線程執(zhí)行完畢:',ctime())if __name__ == "__main__":main()信號量
信號量是最古老的同步原語之一,它是一個計數(shù)器,用于記錄資源消耗情況。當(dāng)資源消耗時遞減,當(dāng)資源釋放時遞增。可以認(rèn)為信號量代表資源是否可用。消耗資源使計數(shù)器遞減的操作習(xí)慣上稱為P,當(dāng)一個線程對一個資源完成操作時,該資源需要返回資源池,這個操作一般稱為V。
Python語言統(tǒng)一了所有的命名,使用與線程鎖同樣的方法名消耗和釋放資源。acquire方法用于消耗資源,調(diào)用該方法計數(shù)器會減1,release方法用于釋放資源,調(diào)用該方法計數(shù)器會加 1。
使用信號量首先要創(chuàng)建Bounded Semaphore類的實例,并且通過該類的構(gòu)造方法傳入計數(shù)器的最大值,然后就可以使用BoundedSemphor對象的acquire方法和release方法獲取資源(計數(shù)器減1)和釋放資源(計數(shù)器加1)了。
示例:
from threading import BoundedSemaphore MAX = 3 # 創(chuàng)建信號量對象,并設(shè)置了計數(shù)器的最大值(也是資源的最大值),計數(shù)器不能超過這個值 semaphore = BoundedSemaphore(MAX) # 輸出當(dāng)前計數(shù)器的值,輸出結(jié)果:3 print(semaphore._value) # 獲取資源,計數(shù)器減1 semaphore.acquire() # 輸出結(jié)果:2 print(semaphore._value) # 獲取資源,計數(shù)器減1 semaphore.acquire() # 輸出結(jié)果:1 print(semaphore._value) # 獲取資源,計數(shù)器減1 semaphore.acquire() # 輸出結(jié)果:0 print(semaphore._value) # 當(dāng)計數(shù)器為0時,不能再獲取資源,所以acquire方法會返回False # 輸出結(jié)果:False print(semaphore.acquire(False)) # 輸出結(jié)果:0 print(semaphore._value) # 釋放資源,計數(shù)器加1 semaphore.release() # 輸出結(jié)果:1 print(semaphore._value) # 釋放資源,計數(shù)器加1 semaphore.release() # 輸出結(jié)果:2 print(semaphore._value) # 釋放資源,計數(shù)器加1 semaphore.release() # 輸出結(jié)果:3 print(semaphore._value) # 拋出異常,當(dāng)計數(shù)器達到最大值時,不能再次釋放資源,否則會拋出異常 semaphore.release()要注意的是信號量對象的acquire方法與release方法。當(dāng)資源枯竭(計數(shù)器為0)時調(diào)用acquinte方法會有兩種結(jié)果。
第1種是acquire方法的參數(shù)值為True或不指定參數(shù)時, acquire方法會處于阻塞狀態(tài),直到使用release釋放資源后,acquire方法才會往下執(zhí)行。
第2種acquire方法的參數(shù)值為False,當(dāng)計數(shù)器為0時調(diào)用acquire方法并不會阻塞,而是直接返回False,表示未獲得資源,如果成功獲得資源,會返回True。
release方法在釋放資源時,如果計數(shù)器已經(jīng)達到了最大值(本例是3),會直接拋出異常,表示已經(jīng)沒有資源釋放了。
信號量與鎖結(jié)合
示例:
from atexit import register from random import randrange from threading import BoundedSemaphore, Lock, Thread from time import sleep, ctime # 創(chuàng)建線程鎖 lock = Lock() # 定義糖果機的槽數(shù),也是信號量計數(shù)器的最大值 MAX = 5 # 創(chuàng)建信號量對象,并指定計數(shù)器的最大值 candytray = BoundedSemaphore(MAX) # 給糖果機的槽補充新的糖果(每次只補充一個槽) def refill():# 獲取線程鎖,將補充糖果的操作變成原子操作lock.acquire()print('重新添加糖果...', end=' ')try:# 為糖果機的槽補充糖果(計數(shù)器加1)candytray.release()except ValueError:print('糖果機都滿了,無法添加')else:print('成功添加糖果')# 釋放線程鎖lock.release() # 顧客購買糖果 def buy():# 獲取線程鎖,將購買糖果的操作變成原子操作lock.acquire()print('購買糖果...', end=' ')# 顧客購買糖果(計數(shù)器減1),如果購買失敗(5個槽都沒有糖果了),返回Falseif candytray.acquire(False):print('成功購買糖果')else:print('糖果機為空,無法購買糖果')# 釋放線程鎖lock.release() # 產(chǎn)生多個補充糖果的動作 def producer(loops):for i in range(loops):refill()sleep(randrange(3)) # 產(chǎn)生多個購買糖果的動作 def consumer(loops):for i in range(loops):buy()sleep(randrange(3))def main():print('開始:', ctime())# 參數(shù)一個2到5的隨機數(shù)nloops = randrange(2, 6)print('糖果機共有%d個槽!' % MAX)# 開始一個線程,用于執(zhí)行consumer函數(shù)Thread(target=consumer, args=(randrange(nloops, nloops+MAX+2),)).start()# 開始一個線程,用于執(zhí)行producer函數(shù)Thread(target=producer, args=(nloops,)).start()@register def exit():print('程序執(zhí)行完畢:', ctime())if __name__ == '__main__':main()運行結(jié)果:
生產(chǎn)者–消費者問題與queue模塊
本節(jié)使用線程鎖以及隊列來模擬一個典型的案例:生產(chǎn)者一消費者模型。在這個場景下,商品或服務(wù)的生產(chǎn)者生產(chǎn)商品、然后將其放到類似隊列的數(shù)據(jù)結(jié)構(gòu)中,生產(chǎn)商品的時間是不確定的.同樣消費者消費生產(chǎn)者生產(chǎn)的商品的時間也是不確定的。
這里使用queue模塊來提供線程間通信的機制,也就是說,生產(chǎn)者和消費者共享一個隊列。生產(chǎn)者生產(chǎn)商品后,會將商品添加到隊列中。消費者消費商品,會從隊列中取一個商品。由于向隊列中添加商品和從隊列中獲取商品都不是原子操作,所以需要使用線程鎖將這兩個操作鎖住。
代碼:
```python from random import randrange from time import sleep,time,ctime from threading import Lock,Thread from queue import Queue# 創(chuàng)建線程鎖對象 lock = Lock()# 從Therad 派生的子類 class MyTherad(Thread):def __init__(self,func,args):super().__init__(target= func , args= args)# 向隊列添加商品 def writeQ(queue):# 獲取線程鎖lock.acquire()print('生產(chǎn)了一個對象,并將其添加到隊列中', end=' ')# 向隊列中添加商品queue.put('商品')print("隊列尺寸", queue.qsize())# 釋放線程鎖lock.release()# 從隊列中獲取商品 def readQ(queue):# 獲取線程鎖lock.acquire()# 從隊列中獲取商品val = queue.get(1)print('消費了一個對象,隊列尺寸:', queue.qsize())# 釋放線程鎖lock.release()#生產(chǎn)若干個生產(chǎn)者者 def writer(queue,loops):for i in range(loops):writeQ(queue)sleep(randrange(1,4))# 生產(chǎn)若干個消費者def reader(queue,loops):for i in range(loops):readQ(queue)sleep(randrange(2,4))funcs =[writer,reader] nfuncs = range(len(funcs))def main():nloops = randrange(2,6)q = Queue(32)threads = []#創(chuàng)建2個線程運行writer 函數(shù)與reder函數(shù)for i in nfuncs:t = MyTherad(funcs[i],(q,nloops))threads.append(t)# 開始線程for i in nfuncs:threads[i].start()#等待兩個線程結(jié)束for i in nfuncs:threads[i].join()print('所以工作已經(jīng)完成')if __name__ =='__main__':main() 效果: # 多進程 盡管多線程可以實現(xiàn)并發(fā)執(zhí)行,不過多個線程之間是共享當(dāng)前進程的內(nèi)存的,也就是說,線程可以申請到的資源有限。要想進一步發(fā)揮并發(fā)的作用,可以考慮使用多進程。如果建立的進程比較多,可以使用`multiprocessing模塊的進程池(Pool類)`,通過Pool類構(gòu)造方法的processes函數(shù),可以指定創(chuàng)建的進程數(shù)。Pool類有一個map方法,用于將回調(diào)函數(shù)與要給回調(diào) 函數(shù)傳遞的數(shù)據(jù)管理起來,代碼如下:```python pool = Pool(processes=4) pool.map(callback_fun,values)上面的代碼利用Pool對象創(chuàng)建了4個進程,并通過map方法指定了進程回調(diào)函數(shù),當(dāng)進程執(zhí)行時,就會調(diào)用這個函數(shù),values是一個可迭代對象,每次進程運行時,就會從values中取一個值傳遞給callback _ fun,也就是說,callback fun函數(shù)至少要有一個參數(shù)接收values中的值。
示例:
from multiprocessing import Pool import time# 進程回調(diào)函數(shù) def get_value(value):i = 0while i <3:#休眠一秒time.sleep(1)print(value,i)i+=1if __name__ =='__main__':#產(chǎn)生5個值,供多線程獲取values =['value{}'.format(str(i)) for i in range(0,5)]# 創(chuàng)建4個進程pool = Pool(processes=4)#將進程回調(diào)函數(shù)與values關(guān)聯(lián)pool.map(get_value,values)爬取豆瓣電影詳情
網(wǎng)頁分析
因為電影分類上的數(shù)據(jù)是異步的所以我們,在XHR中找到真實的網(wǎng)址
https://movie.douban.com/j/chart/top_list?type=11&interval_id=100%3A90&action=&start=20&limit=20發(fā)現(xiàn)每一個分類中的網(wǎng)址只有兩個地方是不一樣的
- type=11
- start=20
而 type = 11 ,這個11是和分類這個連接中的type是一樣的,
start=20 是什么意思呢,通過分析,這個是每一次會獲取20個電影信息,就是說每一次下滑,會一次性返回20個;
每一個對應(yīng)一個電影的數(shù)據(jù),是json格式。需要轉(zhuǎn)換。
代碼:
import json, threading import re, requests from lxml import etree from queue import Queueclass DouBan(threading.Thread):#重寫父類的構(gòu)造函數(shù)def __init__(self, q=None):super().__init__()self.base_url = 'https://movie.douban.com/chart'self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36','Referer': 'https://movie.douban.com/explore'}self.q = qself.ajax_url = 'https://movie.douban.com/j/chart/top_list?type={}&interval_id=100%3A90&action=&start={}&limit=20'# 獲取網(wǎng)頁的源碼def get_content(self, url, headers):response = requests.get(url, headers=headers)return response.text# 獲取電影指定信息def get_movie_info(self, text):# 將json格式轉(zhuǎn)換為Python的字典text = json.loads(text)item = {}for data in text:score = data['score']image = data['cover_url']title = data['title']actors = data['actors']detail_url = data['url']vote_count = data['vote_count']types = data['types']item['評分'] = scoreitem['圖片'] = imageitem['電影名'] = titleitem['演員'] = actorsitem['詳情頁鏈接'] = detail_urlitem['評價數(shù)'] = vote_countitem['電影類別'] = typesprint(item)# 獲取電影api數(shù)據(jù)的def get_movie(self):headers = {'X-Requested-With': 'XMLHttpRequest','User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36',}# 獲取api數(shù)據(jù),并判斷分頁while True:if self.q.empty():breakn = 0while True:# 拼接成一個完整的網(wǎng)址text = self.get_content(self.ajax_url.format(self.q.get(), n), headers=headers)if text == '[]':breakself.get_movie_info(text)n += 20# 獲取所有類型的type——iddef get_types(self):html_str = self.get_content(self.base_url, headers=self.headers) # 分類頁首頁html = etree.HTML(html_str)types = html.xpath('//div[@class="types"]/span/a/@href') # 獲得每個分類的連接,但是切割type# print(types)type_list = []for i in types:p = re.compile('type=(.*?)&interval_id=') # 篩選id,拼接到api接口的路由type = p.search(i).group(1)type_list.append(type)return type_listdef run(self):self.get_movie()if __name__ == '__main__':# 創(chuàng)建消息隊列q = Queue()# 將任務(wù)隊列初始化,將我們的type放到消息隊列中t = DouBan()types = t.get_types()for tp in types:q.put(tp[0])# 創(chuàng)建一個列表,列表的數(shù)量就是開啟線程的樹木crawl_list = [1, 2, 3, 4]for crawl in crawl_list:# 實例化對象movie = DouBan(q=q)movie.start()解釋:
效果:
總結(jié)
- 上一篇: 抓取异步数据(AJAX)笔记
- 下一篇: 邻接表建立图(c语言)