日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

Python 多线程总结(2)— 线程锁、线程池、线程数量、互斥锁、死锁、线程同步

發布時間:2023/11/28 生活经验 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python 多线程总结(2)— 线程锁、线程池、线程数量、互斥锁、死锁、线程同步 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

主要介紹使用 threading 模塊創建線程的 3 種方式,分別為:

  • 創建 Thread 實例函數
  • 創建 Thread 實例可調用的類對象
  • 使用 Thread 派生子類的方式

多線程是提高效率的一種有效方式,但是由于 CPython 解釋器中存在 GIL 鎖,因此 CPython 中的多線程只能使用單核。也就是說 Python 的多線程是宏觀的多線程,而微觀上實際依舊是單線程。

線程和進程之間有很多相似的地方,它們都是一個獨立的任務。但是相比進程,線程要小的多。我們運行線程需要在進程中進行,而且線程和線程之間是共享內存的。相比進程的數據隔離,線程的安全性要更差一些。

1. Thread 實例-函數

使用 threading 模塊創建一個 Thread 的實例,傳遞給它一個函數。

import threading
import timeloops = [4, 2]def loop(nloop, nsec):print 'start loop', nloop, 'at:', time.ctime()time.sleep(nsec)print 'end loop', nloop, 'at:', time.ctime()def main():print 'start main at:', time.ctime()threads = []nloops = range(len(loops))
# 實例化Thread即調用Thread()與調用start_new_thread()最大區別是:新的線程不會立即開始for i in nloops:t = threading.Thread(target=loop, args=(i, loops[i]))threads.append(t)# t.daemon = True     python主程序只有在沒有非守護線程的時候才會退出,設置# 線程是否隨主線程退出而退出,默認為False# 所有線程創建之后,一起調用start()函數啟動,而不是創建一個調用一個for i in nloops:threads[i].start() # join()會等到線程結束,或者在給了timeout參數的時候,等到超時為止
# join() 的作用是讓主線程等待直到該線程執行完for i in nloops:threads[i].join()   print 'end main at:', time.ctime()if __name__ == "__main__":main()

代碼輸出如下:

'''
start main at: Sat Jul 21 22:27:35 2018
start loop 0 at: Sat Jul 21 22:27:35 2018
start loop 1 at: Sat Jul 21 22:27:35 2018
end loop 1 at: Sat Jul 21 22:27:37 2018
end loop 0 at: Sat Jul 21 22:27:39 2018
end main at: Sat Jul 21 22:27:39 2018
'''

2. Thread 實例-可調用的類對象

創建一個 Thread 的實例,傳遞給它一個可調用的類對象

import threading
import timeloops = [4, 2]class ThreadFun(object):def __init__(self, func, args, name=''):self.name = nameself.func = funcself.args = argsdef __call__(self):apply(self.func, self.args)def loop(nloop, nsec):print 'start loop', nloop, 'at:', time.ctime()time.sleep(nsec)print 'end loop', nloop, 'at:', time.ctime()def main():print 'main is start at:', time.ctime()threads = []nloops = range(len(loops))for i in nloops:t = threading.Thread(target=ThreadFun(loop, (i, loops[i]), loop.__name__))# 該類在調用函數方面更加通用,并不局限于loop()函數threads.append(t)for i in nloops:threads[i].start()for i in nloops:threads[i].join()print 'main is end at:', time.ctime()if __name__ == "__main__":main()

3. Thread 派生子類

Thread 派生出一個子類,創建一個這個子類的實例

import threading
import timeloops = [4, 2]class MyThread(threading.Thread):def __init__(self, func, args, name=''):# super().__init__(name=name)	# # 線程的名字threading.Thread.__init__(self)self.name = nameself.func = funcself.args = args# run 等同于之前 target 指定的函數def run(self):apply(self.func, self.args)def test(self):print("this is test")def loop(nloop, nsec):print 'start loop', nloop, 'at:', time.ctime()time.sleep(nsec)print 'end loop', nloop, 'at:', time.ctime()def main():print 'main is start at:', time.ctime()threads = []nloops = range(len(loops))for i in nloops:# 類似于 Thread(target=函數名) , 只會創建出一個線程t = MyThread(loop, [i, loops[i]], loop.__name__)	threads.append(t)# MyThread 類中沒有 start 方法,繼承的父類 調用 start 方法,會自動調用 run 方法for i in nloops:threads[i].start()for i in nloops:threads[i].join()print 'main is end at:', time.ctime()t.test()        # 這種方式不是多線程的方式!!!要在 run 方法里面調用 test 方法,才是多任務的方式
if __name__ == "__main__":main()

大家在用面向對象的方式,要注意類中除了 run 方法外,其他的方法,通過類的實例化去調用并不是多線程的方式。

除了用函數的方式,我們還可以用面向對象的方式來創建線程。這就需要我們手動繼承 Thread 類,而且還需要實現其中的 run 方法,代碼如下:

import time
from threading import Threadclass MyThread(Thread):def __init__(self):super().__init__()def run(self):time.sleep(1)print("我在運行")t = MyThread()
t.start()
print("我是主線程")

4. 使用線程鎖來解決資源競爭

import threadinglock = threading.Lock()
some_var = 0class IncrementThread(threading.Thread):def run(self):global some_varlock.acquire()  #read_value = some_varprint "some_var in %s is %d" % (self.name, read_value)some_var = read_value + 1print "some_var in %s after increment is %d" % (self.name, some_var)lock.release()def use_increment_thread():threads = []for i in range(50):t = IncrementThread()threads.append(t)t.start()for t in threads:t.join()print "After 50 modifications, some_var should have become 50"print "After 50 modifications, some_var is %d" % (some_var,)if __name__ == "__main__":use_increment_thread()

這里需要注意一點,我們兩個函數/進程使用的是同一把鎖,如果我們使用不同的鎖還是會出現數據不安全的問題。

5. 線程池

池是用來保證計算機硬件安全的情況下,最大限度地利用計算機,它降低了程序的運行效率,但是保證了計算機硬件的安全,從而讓你寫的程序能夠正常運行。

  • 同步:提交任務之后原地等待任務的返回結果,期間不做任何事
  • 異步:提交任務之后不等待任務的返回結果,執行繼續往下執行

ThreadPoolExecutor 讓線程的使用更加簡單方便,減小了線程創建/銷毀的資源損耗,無需考慮線程間的復雜同步,方便主線程與子線程的交互。

from concurrent.futures import ThreadPoolExecutor
import timedef get_html(times):time.sleep(times)print("get page {} success".format(times))return timesexecutor = ThreadPoolExecutor(max_workers=2)
task1 = executor.submit(get_html,(2))
task2 = executor.submit(get_html,(3))#done方法用來判斷某個人物是否完成
print(task1.done())
time.sleep(5)
print(task2.done())
print(task1.cancel()
#result方法可以獲取task返回值
print(task1.result())

線程池是從 Python 3.2 才被加入標準庫中的 concurrent.futures 模塊,相比 threading 模塊,該模塊通過 submit 返回的是一個 future 對象,通過它可以獲取某一個線程的任務執行狀態或返回值,另外futures 可以讓多線程和多進程的編碼接口一致,

from concurrent.futures import ThreadPoolExecutor
import time# 括號內可以傳數字 不傳的話默認會開設當前計算機 cpu 個數進程
pool = ThreadPoolExecutor(5)    # 池子里面固定只有五個線程
"""
池子造出來之后 里面會固定存在五個線程
這個五個線程不會出現重復創建和銷毀的過程
"""
def task(n):print(n)time.sleep(2)return n**n# pool.submit(task, 1)    # 朝池子中提交任務  異步提交
# print("主")def call_back(n):    # 回調處理數據的函數print('call_back>>>:',n.result())    # obj.result() 拿到的就是異步提交的任務的返回結果t_list = []
for i in range(10):res = pool.submit(task, i)# print(res.result())   # result 方法   同步提交# res = pool.submit(task, i).add_done_callback(call_back)# 將 res 返回的結果 <Future at 0x100f97b38 state=running>,交給回電函數 call_back 處理# 即 res 做實參傳給 call_back 函數t_list.append(res)# 等待線程池中所有的任務執行完畢之后再繼續往下執行
pool.shutdown()     # 關閉線程池  等待線程池中所有的任務運行完畢
for t in t_list:print(">>>", t.result())

因為開啟線程需要消耗一些時間,所以有時候我們會使用線程池來減少開啟線程花費的時間。線程池的操作定義在 concurrent.futures.ThreadPoolExecutor 類中,下面我們來看看線程池如何使用:

import time
import threading
from concurrent.futures import ThreadPoolExecutordef func1():print(threading.current_thread().name, 'is running')def func2():for i in range(3):time.sleep(1)print(threading.current_thread().name, 'is running')pool = ThreadPoolExecutor(max_workers=2)
t1 = pool.submit(func2)
t2 = pool.submit(func1)

在代碼中我們創建了一個容量為 2 的線程池,我們調用 pool.submit 函數就能使用線程池中的線程了。
?

總結

  • 池子一旦造出來后,固定了線程或進程。
  • 線程不會再變更,所有的任務都是這些線程處理。 這些線程不會再出現重復創建和銷毀的過程。
  • 任務的提交是異步的,異步提交任務的返回結果,應該通過回調機制來獲取。
  • 回調機制就相當于,把任務交給一個員工完成,它完成后主動找你匯報完成結果。

6. 查看線程數量

查看線程數量是通過 threading.enumerate() 方法來查看的。

import threading
import timedef test1():for i in range(5):print("--test1--%d"%i)time.sleep(1)def test2():for i in range(5):print("--test2--%d"%i)time.sleep(1)def main():t1 = threading.Thread(target=test1, name="t1")t2 = threading.Thread(target=test2)t1.start()t2.start()# 獲取當前程序所有的線程print(threading.enumerate())if __name__ == "__main__":main()

輸出結果:

--test1--0
--test2--0
[<_MainThread(MainThread, started 140076707002112)>, <Thread(t1, started 140076670510848)>, <Thread(Thread-1, started 140076662118144)>]
--test1--1
--test2--1
--test1--2
--test2--2
--test2--3
--test1--3
--test2--4
--test1--4

如果多次運行,會發現打印的順序并不是一致的。因為線程的運行時沒有先后順序的,誰先搶到資源就先執行誰。

7. 線程其它方法

import os
import threading
from threading import active_count, current_thread
import timedef task():print("hello")print(os.getpid())print(current_thread().name)time.sleep(1)if __name__ == '__main__':t1 = threading.Thread(target=task, name="t1")t2 = threading.Thread(target=task, name="t2")t1.start()t1.join()    # 等待線程執行結果后,主線程繼續執行t2.start()print(os.getpid())                  # 進程 IDprint(current_thread().name)        # 獲取線程名字print(active_count())       # 統計當前正在活躍的線程數量
  • join() :等待線程執行結果后,主線程繼續執行
  • os.getpid() :進程 ID
  • current_thread().name :獲取線程名字
  • active_count() :統計當前正在活躍的線程數量

8. 多個線程同時修改全局變量

import threading
import timenum = 0def test1(nums):global numfor i in range(nums):num += 1print("test1----num=%d" % num)def test2(nums):global numfor i in range(nums):num += 1print("test2----num=%d" % num)def main():t1 = threading.Thread(target=test1, args=(1000000,))t2 = threading.Thread(target=test2, args=(1000000,))t1.start()t2.start()time.sleep(5)print("main-----num=%d" % num)if __name__ == "__main__":main()

輸出結果如下,當參數 args 變小時不會出現下面這種問題。

test1----num=1177810
test2----num=1476426
main-----num=1476426

當我們的線程 1 到 CPU 中執行代碼 num+=1 的時候,其實這一句代碼要被拆分為 3 個步驟來執行:

  • 第一步:獲取 num 的值
  • 第二步:把獲取的值 +1 操作
  • 第三步:把第二步獲取的值存儲到 num 中

我們在 CPU 中執行這三步的時候,并不能保證這三部一定會執行結束,再去執行線程 2 中的代碼。
因為這是多線程的,所以 CPU 在處理兩個線程的時候,是采用雨露均沾的方式,可能在線程一剛剛將 num+1 還沒來得及將新值賦給 num 時,就開始處理線程二了,因此當線程二執行完全部的 num+=1 的操作后,可能又會開始對線程一的未完成的操作,而此時的操作停留在了完成運算未賦值的那一步,因此在完成對 num 的賦值后,就會覆蓋掉之前線程二對 num+1 操作。

那我們應該怎么解決這個問題?這就要用到我們接下來的知識——鎖。
?

9. 互斥鎖

當多個線程幾乎同時修改某一個共享數據的時候,需要進行同步控制。線程同步能夠保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。
?
互斥鎖為資源引入一個狀態——鎖定/非鎖定。
?
某個線程要更改共享數據時,先將其鎖定,此時資源的狀態為“鎖定”,其他線程不能改變,直到該線程釋放資源,將資源的狀態變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性。
?
基本使用:

#創建鎖
mutex = threading.Lock()#鎖定(加鎖)
mutex.acquire()#解鎖
mutex.release()

互斥鎖解決資源競爭

import threading
import timenum = 0
# 創建一個互斥鎖,默認是沒有上鎖的
mutex = threading.Lock()def test1(nums):global nummutex.acquire()for i in range(nums):num += 1mutex.release()print("test1----num=%d"%num)def test2(nums):global nummutex.acquire()for i in range(nums):num += 1mutex.release()print("test1----num=%d" % num)def main():t1 = threading.Thread(target=test1,args=(1000000,))t2 = threading.Thread(target=test2,args=(1000000,))t1.start()t2.start()time.sleep(2)print("main-----num=%d" % num)if __name__ == "__main__":main()

此時輸出的結果是沒有問題的。互斥鎖也會引發一個問題,就是死鎖。

10. 死鎖

當多個線程幾乎同一 時間的去修改某個共享數據的時候就需要我們進行同步控制,線程同步能夠保證多個線程安全的訪問競爭資源,我們最簡單的就是引入互斥鎖 Lock、遞歸鎖 RLock。這兩種類型的鎖有一點細微的區別,

像下面這種情況,就容易出現死鎖。互相鎖住了對方,又在等對方釋放資源。

import threading  #Lock對象  
lock = threading.Lock()
#A 線程
lock.acquire(a)
lock.acquire(b)#B 線程
lock.acquire(b)
lock.acquire(a)

當線程調用 lock 對象的 acquire() 方法時,lock 就會進入鎖住狀態,如果此時另一個線程想要獲得這個鎖,該線程就會變為阻塞狀態,因為每次只能有一個線程能夠獲得鎖,直到擁有鎖的線程調用 lockrelease() 方法釋放鎖之后,線程調度程序從處于同步阻塞狀態的線程中選擇一個來獲得鎖,并使得該線程進入運行狀態。

這種情況比較容易被發現,還有一種情況不太容易被發現,調用其他加鎖函數,也可能造成死鎖。

def add(lock):global totalfor i in range(100000):lock.acquire()task()total += 1lock.release()def task():lock.acquire()# do somethinglock.release()

避免死鎖:

  • 程序設計上盡量避免
  • 添加超時時間
import threading  #RLock對象  
rLock = threading.RLock() 
rLock.acquire()  
#在同一線程內,程序不會堵塞。
rLock.acquire()  
rLock.release()  
rLock.release()

RLock 允許在同一線程中被多次 acquire ,如果出現 Rlock ,那么 acquirerelease 必須成對出現,即調用了 iacquire ,必須調用 i 次的 release 才能真正釋放所占用的鎖。

11. 線程同步

11.1 condition 條件變量

condition (條件變量):condition 有兩把鎖,一把底層鎖會在線程底層調用 wait 后釋放。我們每次調用 wait 時候回分配一把鎖放到 condition 的等待隊列中等待 notify 方法的喚醒。

import  threading
class factory(threading.Thread):def __init__(self,cond):super(factory,self).__init__(name="口罩生產廠家")self.cond = conddef run(self):with self.cond:self.cond.wait()print("{}:生產了10萬個口罩,快來拿".format(self.name))self.cond.notify()self.cond.wait()print("{}:又生產了100萬個口罩發往武漢".format(self.name))self.cond.notify()self.cond.wait()print("{}:加油,武漢!".format(self.name))self.cond.notify()class wuhan(threading.Thread):def __init__(self,cond):super(wuhan,self).__init__(name="武漢志愿者")self.cond = conddef run(self):with self.cond:print("{}:能幫我們生產一批口罩嗎?".format(self.name))self.cond.notify()self.cond.wait()print("{}:謝謝你們".format(self.name))self.cond.notify()self.cond.wait()print("{}:一起加油".format(self.name))self.cond.notify()self.cond.wait()if __name__=="__main__":lock = threading.Condition()factory = factory(lock)wuhan = wuhan(lock)factory.start()wuhan.start()

上面的代碼,大家看到我用到 with 語句,這是因為 Condition 源碼中實現了 __enter____exit__,類中實現了這兩個方法,就可以用 with 語句。而且 __enter__ 調用了 acquire() 方法,在 __exit__ 方法中調用了 release() 方法。

 def __enter__(self):return self._lock.__enter__()def __exit__(self, *args):return self._lock.__exit__(*args)

11.2 semaphore 信號對象

semaphore (信號對象):用于控制進入數量的鎖,Semaphore 對象管理著一個計數器,當我們每次調用 acquire() 方法的時候會進行遞減,而每個 release() 方法調用遞增,計數器永遠不會低于零,當 acquire() 發現計數器為零的時候線程阻塞等待其他線程調用 release() ,具體如一下示例:

import threading
import timeclass HtmlSpider(threading.Thread):def __init__(self, url, sem):super().__init__()self.url = urlself.sem = semdef run(self):time.sleep(2)print("got html text success")self.sem.release()class UrlProducer(threading.Thread):def __init__(self, sem):super().__init__()self.sem = semdef run(self):for i in range(20):self.sem.acquire()html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)html_thread.start()if __name__ == "__main__":sem = threading.Semaphore(3)url_producer = UrlProducer(sem)url_producer.start()

12. 線程間通信

PythonQueue 模塊中提供了以下幾種隊列類:

  • FIFO(先入先出) 隊列 Queue
  • LIFO(后入先出)隊列 LifoQueue
  • 優先級隊列 Priority Queue

一般我們可以使用隊列來實現線程同步,在開發中 FIFO 隊列我們使用的比較多,下面我將用一個例子說明:

from threading import Thread
from time import sleep
from queue import Queue#生產者
def Producer():count =0while True:if queue.qsize()<1000:for i in range(100):count +=1msg = "生產商品"+str(count)queue.put(msg)print(msg)sleep(0.5)#消費者
def Consumer():while True:if queue.qsize()>100:for i in range(3):msg = "消費者消費了"+queue.get()print(msg)sleep(1)if __name__=="__main__":#定義一個隊列queue = Queue();#初始化商品
for i in range(500):queue.put("初始商品"+str(i))#生產商品for i in range(4):p = Thread(target=Producer)p.start()#消費商品for i in range(10):c = Thread(target=Consumer)c.start()

隊列對象(Queue、LifoQueue 或者 PriorityQueue)提供下列描述的公共方法。

  • Queue.qsize()
    返回隊列的大致大小。注意,qsize()> 0 不保證后續的 get() 不被阻塞,qsize() < maxsize 也不保證 put() 不被阻塞。

  • Queue.empty()
    如果隊列為空,返回 True,否則返回 False。如果 empty() 返回 True,不保證后續調用的 put() 不被阻塞。類似的,如果 empty() 返回 False,也不保證后續調用的 get() 不被阻塞。

  • Queue.full()
    如果隊列是滿的返回 True,否則返回 False。如果 full() 返回 True 不保證后續調用的 get() 不被阻塞。類似的,如果 full() 返回 False 也不保證后續調用的 put() 不被阻塞。
    ?

  • Queue.put(item, block=True, timeout=None)
    將 item 放入隊列。如果可選參數 block 是 true 并且 timeout 是 None(默認),則在必要時阻塞至有空閑插槽可用。如果 timeout 是個正數,將最多阻塞 timeout 秒,如果在這段時間沒有可用的空閑插槽,將引發 Full 異常。反之(block 是 false),如果空閑插槽立即可用,則把 item 放入隊列,否則引發 Full 異常(在這種情況下,timeout 將被忽略)。

  • Queue.put_nowait (item)
    相當于 put(item, False)。

  • Queue.get(block=True, timeout=None)
    從隊列中移除并返回一個項目。如果可選參數 block 是 true 并且 timeout 是 None(默認值),則在必要時阻塞至項目可得到。如果 timeout 是個正數,將最多阻塞 timeout 秒,如果在這段時間內項目不能得到,將引發 Empty 異常。反之(block 是 false),如果一個項目立即可得到,則返回一個項目,否則引發 Empty 異常(這種情況下,timeout 將被忽略)。
    ?
    POSIX 系統 3.0 之前,以及所有版本的 Windows 系統中,如果 block 是 true 并且 timeout 是 None,這個操作將進入基礎鎖的不間斷等待。這意味著,沒有異常能發生,尤其是 SIGINT 將不會觸發 KeyboardInterrupt 異常。
    ?

  • Queue.get_nowait()
    相當于 get(False)。提供了兩個方法,用于支持跟蹤排隊的任務是否被守護的消費者線程完整的處理。

  • Queue.task_done()
    表示前面排隊的任務已經被完成。被隊列的消費者線程使用。每個 get() 被用于獲取一個任務,后續調用 task_done() 告訴隊列,該任務的處理已經完成。
    ?
    如果 join() 當前正在阻塞,在所有條目都被處理后,將解除阻塞(意味著每個 put() 進隊列的條目的 task_done() 都被收到)。 如果被調用的次數多于放入隊列中的項目數量,將引發 ValueError 異常。
    ?

  • Queue.join()
    阻塞至隊列中所有的元素都被接收和處理完畢。
    ?
    在多線程通信中,Queue 扮演者重要的角色,一般添加數據到隊列使用 put() 方法,在隊列中取數據使用 get() 方法,后面針對 Queue 還會做進一步的講解

其它參考
https://segmentfault.com/a/1190000014306740
一篇帶你熟練使用多線程與原理

總結

以上是生活随笔為你收集整理的Python 多线程总结(2)— 线程锁、线程池、线程数量、互斥锁、死锁、线程同步的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。