Python多线程篇一,theanding库、queue队列、生产者消费者模式爬虫实战代码超详细的注释、自动分配线程对应多任务,GIF演示【傻瓜式教程】
? 簡(jiǎn)介:大家好,我是zy阿二,我是一名對(duì)知識(shí)充滿渴望的自由職業(yè)者。
?? 最近我沉溺于Python的學(xué)習(xí)中。你所看到的是我的學(xué)習(xí)筆記。
?? 如果對(duì)你有幫助,請(qǐng)關(guān)注我,讓我們共同進(jìn)步。有不足之處請(qǐng)留言指正!
認(rèn)識(shí)多線程
A:那我們以前寫的程序難道都是單線程的嘛?
Q:是的。把程序比作一個(gè)作坊。 單線程就是老板自己接單,自己安排任務(wù),自己生產(chǎn)產(chǎn)品,自己銷售。生產(chǎn)效率低,產(chǎn)值低,但是管理方便自己管自己,做完一個(gè)做下一個(gè)。
A:那多線程是什么樣子?
Q:老板接了個(gè)大單子,一個(gè)人來(lái)不及干了,怎么辦? 只能招工、請(qǐng)外援。老板從第一生產(chǎn)線退居到了第二生產(chǎn)線的管理者,管理工人生產(chǎn)。生產(chǎn)效率高了,產(chǎn)值也高了,但是帶來(lái)問(wèn)題就是如何把大量的工作合理的安排給工人呢? 請(qǐng)往下閱讀。
A: 現(xiàn)有多線程教程很多,你為什么還要寫他?
Q:正如標(biāo)題所述,自動(dòng)分配線程對(duì)應(yīng)多任務(wù)。在上看到太多的文章都是直接3行代碼開多線程。第一句for循環(huán),第二句創(chuàng)建線程任務(wù),第三句t.strat。殊不知這樣是在用每一個(gè)線程做一個(gè)任務(wù)。過(guò)度耗費(fèi)CPU資源,過(guò)高的線程并發(fā),這樣的爬蟲對(duì)目標(biāo)網(wǎng)站也是不道德的。
多線程的簡(jiǎn)單用法。
| t=threading.Thead(target=func,args=(a,)) | 創(chuàng)建一個(gè)線程對(duì)象,并給一個(gè)func任務(wù) |
| t.start | 激活多線程對(duì)象(激活 ≠ 開啟) |
| t.join | 等待線程結(jié)束 |
| threading.active_count() | 返回當(dāng)前激活線程數(shù) |
| m=threading.BoundedSemaphore(3) | 設(shè)定線程的最大數(shù)量 |
| m.acquire(timeout=5) | 超線程上鎖,超時(shí)時(shí)間5秒 |
| m.release() | 解鎖一個(gè)線程,寫在任務(wù)結(jié)束的地方 |
一、初試多線程:
先來(lái)看看示例代碼,單線程的。等于只有老板自己一個(gè)人在干活。生產(chǎn)一個(gè)商品需要耗費(fèi)0.3秒,所以生產(chǎn)10個(gè)需要3秒。這是單線程的效果
較真的朋友要說(shuō):不對(duì)啊,明明是3.091秒啊。。。 Python編譯代碼,開啟進(jìn)程執(zhí)行代碼都需要時(shí)間。老板安排工作是有耗時(shí)的
老板招了10個(gè)工人,每個(gè)工人生產(chǎn)1個(gè)產(chǎn)品需要1秒。10個(gè)線程同時(shí)開始工作,任務(wù)。只0.0019秒? 聰明的小伙伴就開始提問(wèn)了:
A1:就算10個(gè)人一起開始,那也應(yīng)該需要1秒才能完成任務(wù)啊。為什么只用0.19秒就打印了主線程結(jié)束呢?
Q:老板給10個(gè)工人安排任務(wù)用了0.0019秒,所以老板是主線程,他沒(méi)有其他任務(wù)所以結(jié)束的很快,但是子線程(工人)任然需要繼續(xù)工作,1秒后,子線程全部完工,同時(shí)進(jìn)程結(jié)束。所以此時(shí)的工廠有11個(gè)人,10個(gè)工人和1個(gè)老板。
A2:為什么打印的結(jié)果不是那么整齊?
Q:t.start() 是激活線程,具體開始時(shí)間取決于CPU,先激活的線程不一定就是先完成的。同時(shí)每個(gè)線程在實(shí)際情況中遇到的情況不同,所以具體完成的時(shí)間不同,打印結(jié)果也就會(huì)亂。
A3:那如果我有9999個(gè)產(chǎn)品難道要開9999個(gè)線程才可以嗎?
Q:厲害!能想到這個(gè)問(wèn)題。很多剛接觸多線程去做爬蟲的伙伴,經(jīng)常會(huì)這樣: 有100頁(yè)面要爬,然后寫多線程的時(shí)候代碼如下
細(xì)品這個(gè)代碼是什么意思? 開了100個(gè)線程?做100個(gè)任務(wù)?
如果你是老板,你會(huì)雇傭100個(gè)工人每個(gè)工人只生成一個(gè)產(chǎn)品就下班了?
所以如何使用theading模塊合理安排多線程多任務(wù),請(qǐng)往下看。
A4:示例func中為什么沒(méi)有return呢?那如何接受返回值?
Q:threading庫(kù)并沒(méi)有返回值的功能。所以我們要用其他的方法,1. 寫入硬盤。 2. 全局變量。 3. 隊(duì)列。 這也是本文要講的內(nèi)容之一。
二、多線程 threading.Thread 參數(shù)和方法:
# 先來(lái)看下 threading.Thread 中接受的參數(shù) t = threading.Thread(group=None, target=(), name=None,args=(), kwargs={}, *, daemon=True)| target | 必填,函數(shù)名或方法名。 |
| args | 元組類型數(shù)據(jù)傳參。(單個(gè)參數(shù)也需要寫成元組,如:(1,)) |
| kwargs | 字典類型數(shù)據(jù)傳參。 |
| name | 線程名,可以忽略,一般不用設(shè)置。有默認(rèn)名。 |
| group | 線程組,直接忽略,因?yàn)槟壳爸荒苁褂肗one。 |
| daemon | 布爾值,默認(rèn)False。主線程守護(hù)。True = 子線程會(huì)隨主線程一起結(jié)束 |
| t.setDaemon(True) | 也可以在后續(xù)設(shè)置線程守護(hù) |
| t.start() | 激活線程 |
| t.jion() | 等待對(duì)象線程結(jié)束。 |
| threading.current_thread() | 獲取當(dāng)前的線程名字 |
| threading.active_count() | 獲得當(dāng)前激活的線程數(shù) |
| lock = threading.BoundedSemaphore() | 限制最大線程數(shù)量鎖 |
| lock.acquire() | 上鎖 |
| lock.release() | 解鎖 |
| lock2 = threading.Lock() | 線程鎖,互斥鎖 |
| lock2.acquire() | 上鎖 |
| lock2.release() | 解鎖 |
三、多任務(wù) 分配(任務(wù)多 線程少)
不廢話,直接行代碼
import threading import timedef func():time.sleep(0.3)print('當(dāng)前線程數(shù)量:', threading.active_count())# 在完成工作后,解鎖lock.release()if __name__ == '__main__':# 創(chuàng)建一個(gè)允許最大激活線程數(shù)量為 5 的鎖# 可以理解為:做多允許出現(xiàn) 5 把鎖lock = threading.BoundedSemaphore(5)for i in range(100):# 每次開啟線程前,加一次鎖,循環(huán)5次后,這里就會(huì)等待解鎖一把后才會(huì)放行。lock.acquire()t = threading.Thread(target=func)t.start()
妙不妙?
其實(shí)這個(gè)問(wèn)題因?yàn)橛懈玫慕鉀Q方案:線程池,所以導(dǎo)致了threading模塊的這個(gè)控制最大線程的方法被雪藏。我上培訓(xùn)機(jī)構(gòu)的老師都沒(méi)教。都是直接一個(gè)for循環(huán)到底每個(gè)任務(wù)一個(gè)線程。
我也是鉆了牛角看了很多文章,突然豁然開朗。如下是我的解題經(jīng)歷:
三、如何接受返回值?(建議直接看3??)
1??、寫入硬盤存儲(chǔ)數(shù)據(jù)。這個(gè)都看不懂就先去學(xué)基礎(chǔ)吧。
很顯然這根本處理不了大數(shù)據(jù),而且效率低下。
import threadingdef func(f, i):f.write(i)if __name__ == '__main__':f = open('xxx.txt', 'a')for i in range(5):t = threading.Thread(target=func, args=(f, i))t.start()f.close()2??、全局變量
當(dāng)多個(gè)線程同時(shí)操作同一個(gè)全局變量的時(shí)候,數(shù)據(jù)將會(huì)變得不準(zhǔn)確。
而且下面的代碼實(shí)際上是一個(gè)單線程的。因?yàn)樽x數(shù)據(jù)線程雖然創(chuàng)建了,但是確在等待寫數(shù)據(jù)的線程結(jié)束后才被激活。
如果我們?nèi)サ袅?t1.join之后。保證2個(gè)線程可以同時(shí)進(jìn)行。我們來(lái)看下代碼運(yùn)行結(jié)果。
同時(shí)為了更直觀的反映問(wèn)題,我們把a(bǔ) 換成int類型,函數(shù)是讓2個(gè)線程分別給 a +1 一百萬(wàn)次
第一個(gè)線程打印的結(jié)果盡然只有158.4萬(wàn)+??這結(jié)果變得不可控了!就是多線程操作同一個(gè)全局變量在處理大量數(shù)據(jù)時(shí)必然會(huì)出現(xiàn)的問(wèn)題。那如何解決呢?
在t1.start()后面加上t1.join()確實(shí)可以解決這個(gè)問(wèn)題,但問(wèn)題是多線程變成了單線程。
如果t1有其他IO任務(wù)需要3秒,t2的也有其他的IO任務(wù)需要3秒,那么加了join后的整個(gè)線程就需要6秒才能完成。這就妥妥的偽多線程啊。
再來(lái)看一個(gè)示例:
但是實(shí)際答案確出乎意料。。這就是多線程操作同一個(gè)全局變量的問(wèn)題。那么下面來(lái)講解決方案。
2??1??、互斥鎖
lock = threading.Lock() 程序開始前創(chuàng)建一把鎖
lock.acquire() 在修改全局變量時(shí)先用此命令上鎖
lock.release() 修改結(jié)束后,再加上此命令解鎖
那為了解決上面數(shù)據(jù)不可控的情況,我們利用lock = threading.Lock() 通過(guò)創(chuàng)建鎖,上鎖,解鎖的步驟,解決了多線程和處理全局變量的問(wèn)題。但是很顯然,這樣做的優(yōu)勢(shì)是可以做到多線程,及時(shí)t1,t2,都有3秒的IO任務(wù),那么整個(gè)進(jìn)程也是只需要3秒就會(huì)完成。但是在處理全局變量時(shí),依然會(huì)出現(xiàn)t2等待t1計(jì)算結(jié)束后t2才會(huì)處理。那么到底如何才能完美解決多線程數(shù)據(jù)交互的問(wèn)題呢?
3??、queue庫(kù),隊(duì)列
標(biāo)準(zhǔn)流程第一步,安裝庫(kù) :pip install queue
隊(duì)列就是倉(cāng)庫(kù)。舉個(gè)栗子,還是那個(gè)工廠,工人們各自生產(chǎn)產(chǎn)品互不影響,但是成品需要放到一個(gè)共有的倉(cāng)庫(kù),等待老板下令發(fā)貨,隊(duì)列就是這個(gè)倉(cāng)庫(kù)。而倉(cāng)管也有發(fā)貨順序的。現(xiàn)在我們來(lái)看下3個(gè)常用的queue列隊(duì)的發(fā)貨順序:
| queue.Queue | 先進(jìn)先出 FIFO |
| queue.LifoQueue | 后進(jìn)先出 |
| queue.PriorityQueue | 自定義進(jìn)出順序 |
| queue.SimpleQueue | 簡(jiǎn)單的FIFO 隊(duì)列,缺少任務(wù)跟蹤等高級(jí)功能。 |
| q.put(x) | 添加x到隊(duì)列中,x可以是任何類型數(shù)據(jù),但是一次只能加1個(gè)數(shù)據(jù) |
| q.put(x,block=False) | 當(dāng)列隊(duì)已滿時(shí)再增加數(shù)據(jù)會(huì)報(bào)錯(cuò) queue.Full |
| q.put(x,timeout=5) | 當(dāng)隊(duì)列已滿時(shí),會(huì)最多等待5秒,如果5秒后還是沒(méi)有空位,則會(huì)報(bào)錯(cuò),queue.Full |
| q.get() | 從隊(duì)列中取數(shù)據(jù)(得到的數(shù)據(jù)由發(fā)貨順序決定) |
| q.get(block=False) | 隊(duì)列為空,仍然繼續(xù)取數(shù)據(jù),會(huì)報(bào)錯(cuò)_queue.Empty |
| q.get(timeout=5) | 取數(shù)據(jù)時(shí)可以最多等待5秒,如果5秒后仍然沒(méi)數(shù)據(jù)則報(bào)錯(cuò)_queue.Empty |
| q.qsize() | 返回隊(duì)列已有數(shù)據(jù)量,int |
| q.empty() | 返回隊(duì)列是否為空,空為True |
| q.full() | 返回列隊(duì)是否已滿,滿為True |
| q.task_done() | 告訴隊(duì)列,該任務(wù)已處理完成 |
| q.join | 阻塞隊(duì)列。當(dāng)隊(duì)列添加新數(shù)據(jù)時(shí),任務(wù) +1,當(dāng)調(diào)用task_done(),任務(wù) -1,當(dāng)計(jì)數(shù)=0 join() 解除阻塞 |
| q.queue | 得到當(dāng)前隊(duì)列中的所有數(shù)據(jù) |
3??1??、queue.Queue 先進(jìn)先出 FIFO
參數(shù):maxsize = int,用于設(shè)置可以放入隊(duì)列的數(shù)據(jù)上線。當(dāng)達(dá)到這個(gè)大小的時(shí)候,插入操作將阻塞至隊(duì)列中的項(xiàng)目被消費(fèi)掉。如果 maxsize 默認(rèn) 等于零,隊(duì)列則為無(wú)窮大。(解釋:maxsize 是設(shè)置倉(cāng)庫(kù)的大小,可以容納多少商品,當(dāng)倉(cāng)庫(kù)塞滿后,后面要加進(jìn)來(lái)的商品就會(huì)在倉(cāng)庫(kù)外面排隊(duì),有空間了才會(huì)再進(jìn)來(lái)。)
import queueq = queue.Queue() # 創(chuàng)建隊(duì)列,不設(shè)置 maxsize,默認(rèn)無(wú)窮大 for i in range(4):q.put(i) # 往隊(duì)列中加數(shù)據(jù)for i in range(4):print(q.get()) # 從隊(duì)列中取數(shù)據(jù)# 加進(jìn)入的順數(shù)是0、1、2、3,取出來(lái)的順序也是0、1、2、3 0 1 2 3
3??2??、queue.Queue 先進(jìn)先出 LIFO
import queue
q = queue.LifoQueue()
for i in range(4):q.put(i)
for i in range(4):print q.get()# 加進(jìn)入的順數(shù)是0、1、2、3,取出來(lái)的順序是3、2、1、0
3
2
1
0
3??3??、queue.PriorityQueue 優(yōu)先級(jí)隊(duì)列
import queue# 示例1 。 正常添加到隊(duì)列中。
q = queue.PriorityQueue()
q.put_nowait((0, '123', ['aaa', 'eee'], 0))
q.put_nowait((0, '456', ['bbb'], 0))# 示例2。 報(bào)錯(cuò)!
q.put_nowait((0, '123', {"name": 'aaa', "age": 12}, 0))
q.put_nowait((0, '456', {"name": 'bbb'}, 0))
示例2報(bào)錯(cuò)內(nèi)容 :
TypeError: ‘<’ not supported between instances of ‘dict’ and ‘list’。
“dict”和“l(fā)ist” 之間無(wú)法進(jìn)行數(shù)據(jù)比較。
PriorityQueue的正確使用方式,應(yīng)該是如下兩種,使用tuple的第一個(gè)元素作為優(yōu)先級(jí)數(shù)字,或者自定義類中重定義__lt__方法,使得類實(shí)例能夠相互比較。
import queue# 示例3。插入的tuple中,index=0的值代表優(yōu)先級(jí),index=1的值是數(shù)據(jù) q = queue.PriorityQueue() q.put_nowait((0, {'name': 'aaa'})) q.put_nowait((1, {'name', 'bbbb'})) # 示例4: import queueclass Task(object):def __init__(self, priority, name):self.priority = priorityself.name = namedef __str__(self):return f'Task(priority={self.priority}, name={self.name})'def __lt__(self, other):""" 定義<比較操作符。 """return self.priority < other.priorityq = queue.PriorityQueue() # 自定義的類定義了__lt__, 可以比較大小 q.put_nowait(Task(3, "task1")) q.put_nowait(Task(1, "task2")) print(q.get()) print(q.get())返回結(jié)果: Task(priority=1, name=task2) Task(priority=3, name=task1)
4??、使用theading和queue 實(shí)操爬蟲
獲取堆糖網(wǎng)圖片
https://www.duitang.com/search/?kw=%E7%BE%8E%E5%A5%B3&type=feed
第一步、獲取圖片信息
第二步、下載圖片
代碼中有超詳細(xì)的注釋。請(qǐng)直接copy代碼到IDE中查看或運(yùn)行。
import time import requests # 網(wǎng)絡(luò)請(qǐng)求庫(kù) import threading # 多線程庫(kù) from queue import Queue # 先進(jìn)先出的隊(duì)列 from tqdm import tqdm # 進(jìn)度條庫(kù) import re # 正則表達(dá)式 import os""" 獲取堆糖網(wǎng)美女圖片 https://www.duitang.com/search/?kw=%E7%BE%8E%E5%A5%B3&type=feed """ q = Queue() # 實(shí)例化一個(gè)隊(duì)列,不指定最大長(zhǎng)度。即無(wú)限長(zhǎng)。def GetImgUrl(page):"""生產(chǎn)者的實(shí)際工作內(nèi)容。生產(chǎn)每個(gè)網(wǎng)頁(yè)上的圖片信息。 URL和給圖片命名的信息傳入q 隊(duì)列:param page: int, 需要爬取多少頁(yè)的圖片信息"""param = {'kw': '美女','after_id': str(24 * page), # 一頁(yè)24條圖片數(shù)據(jù),所以這里的值是 24*page'type': 'feed','include_fields': 'top_comments,is_root,source_link,item,buyable,root_id,status,like_count,like_id,sender,album,reply_count,favorite_blog_id','_type': '','_': f'{timeint}{100 + page}'} # 時(shí)間戳 + 最后3位數(shù)100是隨便給的,只要隨著翻頁(yè)遞增即可。url = f"https://www.duitang.com/napi/blogv2/list/by_search/"# 返回的結(jié)果中包含了我們需要下載的圖片地址resp = requests.get(url, params=param, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36'})# 預(yù)創(chuàng)建一個(gè)正則表達(dá)式,取圖片地址、 ID 和 上傳者的名字。后續(xù)將名字+id給圖片命名RE = re.compile(r'"path":"(?P<url>.*?)","size":.*?"id":(?P<id>\d+),"sender".*?"username":"(?P<name>.*?)"', re.S)img_url = RE.finditer(resp.text)# 將 圖片地址和圖片ID丟到隊(duì)列里for _j in img_url:q.put([_j.group('url'), _j.group('id'), _j.group('name')])# tqdm.write(f"{_j.group('url')}, {_j.group('id')}, {_j.group('name')}") # 等同于print# 完成任務(wù),拿到圖片數(shù)據(jù),解鎖一個(gè)生產(chǎn)者線程ProducerMaximumThread.release()def downloadImg(imgurl, imgid, imgname):"""消費(fèi)者的工作,通過(guò)隊(duì)列獲取圖片信息,并開始下載圖片。:param imgurl: str , 圖片的URL地址:param imgid: str, 圖片的ID 用于給圖片命名。圖片名稱 = imgname+imgid.jpg:param imgname: str, 圖片上傳者的名字,用于給圖片命名。圖片名稱 = imgname+imgid.jpg:return:"""resp = requests.get(imgurl)# 二進(jìn)制方式寫文件。 等于保存圖片操作with open(f'{pic_path}\\{imgname}{imgid}.jpg', 'wb') as f:f.write(resp.content) # 二進(jìn)制寫入# 隊(duì)列任務(wù)完成,返回結(jié)果q.task_done()# 消費(fèi)者下載圖片完成,解鎖一個(gè)線程ConsumerMaximumThread.release()def friststep():"""給生產(chǎn)者 安排任務(wù),獲取圖片信息,url 和 給圖片命名的數(shù)據(jù)"""with tqdm(range(page + 1), desc='獲取圖片地址') as tbar1: # 創(chuàng)建動(dòng)作條,實(shí)例化# 和正常循環(huán)一樣,只是額外增加了進(jìn)度條for _i in tbar1:# 設(shè)置每個(gè)循環(huán)中進(jìn)度條展示的動(dòng)態(tài)信息tbar1.set_postfix(當(dāng)前頁(yè)碼=_i, 總頁(yè)數(shù)=page, 已有列隊(duì)數(shù)=q.qsize(), 當(dāng)前激活線程=threading.active_count())# 上鎖,限制生產(chǎn)者的線程數(shù)量。timeout=5 設(shè)置鎖的最大時(shí)間。避免特殊情況導(dǎo)致堵塞ProducerMaximumThread.acquire(timeout=5)# 給多線程安排任務(wù),并激活線程。t = threading.Thread(target=GetImgUrl, args=(_i,), daemon=True)t.start()def secondstep():"""給消費(fèi)者 安排任務(wù),下載圖片"""plan = q.qsize()with tqdm(range(plan), desc='正在下載圖片') as tbar2: # 創(chuàng)建進(jìn)度條# 和正常循環(huán)一樣,只是額外增加了進(jìn)度條for _i in tbar2:# 設(shè)置每個(gè)循環(huán)中進(jìn)度條展示的動(dòng)態(tài)信息tbar2.set_postfix(已下載=_i, 總數(shù)=plan, 列隊(duì)任務(wù)剩余=q.qsize(), 當(dāng)前激活線程=threading.active_count())# 從隊(duì)列中取數(shù)據(jù),設(shè)置超時(shí)時(shí)間,編碼數(shù)據(jù)空了后直接報(bào)錯(cuò)imgurl, imgid, imgname = q.get(timeout=3)# 上鎖。 限制消費(fèi)者的線程數(shù)量。timeout=5 設(shè)置鎖的最大時(shí)間ConsumerMaximumThread.acquire(timeout=5)# 給消費(fèi)者多線程安排任務(wù),并激活線程。 daemon=True 主線程結(jié)束,子線程也結(jié)束t = threading.Thread(target=downloadImg, args=(imgurl, imgid, imgname), daemon=True)t.start()if __name__ == '__main__':pic_path = r'E:\堆糖圖片' # 下載圖片的存放路徑# 判斷文件夾 是否存在if not os.path.exists(pic_path):# 如果不存在那么就創(chuàng)建文件夾。os.mkdir(pic_path)# 設(shè)置需要爬多少頁(yè)圖片,每頁(yè)24張。 控制在50以內(nèi)page = 10# 設(shè)置生產(chǎn)者(爬數(shù)據(jù))最大線程數(shù)量。獲取圖片URL地址的最大線程數(shù)量ProducerMaximumThread = threading.BoundedSemaphore(3)# 設(shè)置消費(fèi)者(下載數(shù)據(jù))最大線程數(shù)量。下載圖片保存到指定文件夾ConsumerMaximumThread = threading.BoundedSemaphore(8)# 獲取到當(dāng)前時(shí)間戳,去掉小數(shù)點(diǎn)timeint = int(time.time())# 執(zhí)行生產(chǎn)者任務(wù),獲取圖片路徑friststep()# 消費(fèi)者模式,下載圖片保存到指定文件夾secondstep()總結(jié)
以上是生活随笔為你收集整理的Python多线程篇一,theanding库、queue队列、生产者消费者模式爬虫实战代码超详细的注释、自动分配线程对应多任务,GIF演示【傻瓜式教程】的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Go语言自学系列 | golang并发编
- 下一篇: websocket python爬虫_p