日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

并发编程.md

發布時間:2024/4/14 编程问答 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 并发编程.md 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

操作系統基礎

  • 人機矛盾: CPU利用率低

  • 磁帶存儲+批處理:降低數據的讀取時間,提高CPU的利用率

  • 多道操作系統------在一個任務遇到IO的時候主動讓出CPU,給其他任務使用

    • 由操作系統完成
    • 切換要需要時間
    多道技術:1.產生背景:針對單核,實現并發ps:現在的主機一般是多核,那么每個核都會利用多道技術有4個cpu,運行于cpu1的某個程序遇到io阻塞,會等到io結束再重新調度,會被調度到4個cpu中的任意一個,具體由操作系統調度算法決定。2.空間上的復用:如內存中同時有多道程序3.時間上的復用:復用一個cpu的時間片強調:遇到io切,占用cpu時間過長也切,核心在于切之前將進程的狀態保存下來,這樣才能保證下次切換回來時,能基于上次切走的位置繼續運行
  • 分時操作系統-------給時間分片,讓多個任務輪流使用CPU

    • 短作業優先算法
    • 先來先服務算法

    每個程序分配一個時間片,輪轉使用CPU,切換需要時間,降低CPU利用率,提高用戶體驗

  • 通用操作系統-------分時操作系統 + 多道操作系統 + 實時操作系統

    • 多個程序一起在計算機中執行
    • 一個程序如果遇到IO操作,切出去讓出CPU
    • 一個程序沒有遇到IO,但是時間片到時了,切出去讓出CPU
  • 操作系統負責什么?

    調度進程先后執行的順序 控制執行的時間等等
    資源的分配

進程的概念

進程:

  • 運行中的程序
  • 是計算機中最小的資源分配單位
  • 在操作系統中唯一標識符:PID
進程和程序的區別: 程序只是一個文件 進程是這個文件被CPU運行起來了

操作系統調度進程的算法:

  • 短作業優先
  • 先來先服務
  • 時間片輪轉
  • 多級反饋算法

并行與并發:

  • 并行:并行是指兩者同時執行,比如賽跑,兩個人都在不停的往前跑;(資源夠用,比如三個線程,四核的CPU )
  • 并發:并發是指資源有限的情況下,兩者交替輪流使用資源,比如一段路(單核CPU資源)同時只能過一個人,A走一段后,讓給B,B用完繼續給A ,交替使用,目的是提高效率。

  • 就緒(Ready)狀態

    當進程已分配到除CPU以外的所有必要的資源,只要獲得處理機便可立即執行,這時的進程狀態稱為就緒狀態。

  • 執行/運行(Running)

    狀態當進程已獲得處理機,其程序正在處理機上執行,此時的進程狀態稱為執行狀態。

  • 阻塞(Blocked)狀態

    正在執行的進程,由于等待某個事件發生而無法執行時,便放棄處理機而處于阻塞狀態。引起進程阻塞的事件可有多種,例如,等待I/O完成、申請緩沖區不能滿足、等待信件(信號)等。

同步異步:

所謂同步就是一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成后,依賴的任務才能算完成,這是一種可靠的任務序列。要么成功都成功,失敗都失敗,兩個任務的狀態可以保持一致。

所謂異步是不需要等待被依賴的任務完成,只是通知被依賴的任務要完成什么工作,依賴的任務也立即執行,只要自己完成了整個任務就算完成了。至于被依賴的任務最終是否真正完成,依賴它的任務無法確定,所以它是不可靠的任務序列。

在python程序中的進程操作

進程:

# 創建進程 時間開銷大 # 銷毀進程 時間開銷大 # 進程之間切換 時間開銷大

線程:

線程是進程的一部分,每個進程中至少有一個線程 能被CPU調度的最小單位 一個進程中的多個線程是可以共享這個進程的數據的 —— 數據共享 線程的創建、銷毀、切換 開銷遠遠小于進程 —— 開銷小

進程:是計算機中最小的資源分配單位(進程是負責圈資源)

線程:是計算機中能被CPU調度的最小單位 (線程是負責執行具體代碼的)

os.getpid():獲取當前進程pid

os.getppid():獲取父級進程pid,可以創建子進程,在pycharm中啟動的所有py程序都是pycharm的子進程

import os import time from multiprocessing import Process # multiprocessing多進程模塊Process類 def func():print('start',os.getpid())time.sleep(1)print('end',os.getpid())if __name__ == '__main__':p = Process(target=func) # 將函數封裝到類,創建一個要開啟func進程的對象p.start() # 異步 調用開啟進程的方法 但是并不等待這個進程真的開啟print('main :',os.getpid()) #main : 11436 #start 9860 #end 9860 操作系統創建進程的方式不同 windows操作系統執行開啟進程的代碼實際上新的子進程需要通過import父進程的代碼來完成數據的導入工作所以有一些內容我們只希望在父進程中完成,就寫在if __name__ == '__main__':下面 ios linux操作系統創建進程 fork,拷貝的方式
  • 主進程和子進程之間的關系

    父進程會等待著所有的子進程結束之后才結束,為了回收資源

主進程代碼執行完畢:# 主進程負責回收子進程的資源# 如果子進程執行結束,父進程沒有回收資源,那么這個子進程會變成一個僵尸進程 # 主進程的結束邏輯# 主進程的代碼結束# 所有的子進程結束# 給子進程回收資源# 主進程結束# 主進程怎么知道子進程結束了的呢?# 基于網絡、文件
  • join方法 :阻塞父進程,直到對應子進程結束就結束
import time from multiprocessing import Process def send_mail():time.sleep(3)print('發送了一封郵件') if __name__ == '__main__':p = Process(target=send_mail)p.start() # 異步 非阻塞# time.sleep(5)print('join start')p.join() # 同步 阻塞 直到p對應的進程結束之后才結束阻塞print('5000封郵件已發送完畢') #join start #發送了一封郵件 #5000封郵件已發送完畢 import time import random from multiprocessing import Process def send_mail(a):time.sleep(random.random())print('發送了一封郵件',a)if __name__ == '__main__':l = []for i in range(10):p = Process(target=send_mail,args=(i,))#向子進程傳參數,用元組p.start()l.append(p) #回收多個子進程資源,先添指列表,最后統一處理for p in l:p.join()# 阻塞 直到上面的十個進程都結束print('5000封郵件已發送完畢') 發送了一封郵件 5 發送了一封郵件 4 發送了一封郵件 3 ...... 5000封郵件已發送完畢

補充:Windows開啟進程,由于創建機制,必須采用此方式.

print([__name__]) if __name__ == '__main__':# 控制當這個py文件被當作腳本直接執行的時候,就執行這里面的代碼# 當這個py文件被當作模塊導入的時候,就不執行這里面的代碼print('hello hello') # __name__ == '__main__'# 執行的文件就是__name__所在的文件 # __name__ == '文件名'# __name__所在的文件被導入執行的時候
  • 守護進程

    隨著主進程的代碼結束而結束的,所有的子進程都必須在主進程結束之前結束,由主進程來負責回收資源

    p.daemon = True

其他方法:

p.is_alive() 判斷進程是否活著p.terminate() # 可以解釋異步非阻塞, 關閉需要時間,并不等到返回結束進程結果,會變僵尸 def son1():while True:print('is alive')time.sleep(0.5)if __name__ == '__main__':p = Process(target=son1)p.start() # 異步 非阻塞print(p.is_alive())time.sleep(1)p.terminate() # 異步的 非阻塞print(p.is_alive()) # 進程還活著 因為操作系統還沒來得及關閉進程time.sleep(0.01)print(p.is_alive()) # 操作系統已經響應了我們要關閉進程的需求,再去檢測的時候,得到的結果是進程已經結束了

使用面向對象方式開啟進程

import os import time from multiprocessing import Processclass MyProcecss2(Process): #必須繼承Processdef run(self): #必須要有run方法,重寫process的run,start自動調用runwhile True:print('is alive')time.sleep(0.5)class MyProcecss1(Process):def __init__(self,x,y): #傳參數要定義init函數self.x = xself.y = ysuper().__init__() #要導入父類的初始化參數def run(self):print(self.x,self.y,os.getpid())for i in range(5):print('in son2')time.sleep(1)if __name__ == '__main__':mp = MyProcecss1(1,2)mp.daemon = True mp.start()print(mp.is_alive())mp.terminate()# mp2 = MyProcecss2()# mp2.start()# print('main :',os.getpid())# time.sleep(1)

Process操作進程的方法

# p.start() 開啟進程 異步非阻塞 # p.terminate() 結束進程 異步非阻塞 # p.join() 同步阻塞 # p.isalive() 獲取當前進程的狀態 # daemon = True 設置為守護進程,守護進程永遠在主進程的代碼結束之后自動結束

# 1.如果在一個并發的場景下,涉及到某部分內容# 是需要修改一些所有進程共享數據資源# 需要加鎖來維護數據的安全 # 2.在數據安全的基礎上,才考慮效率問題 # 3.同步存在的意義# 數據的安全性# 在主進程中實例化 lock = Lock() # 把這把鎖傳遞給子進程 # 在子進程中 對需要加鎖的代碼 進行 with lock:# with lock相當于lock.acquire()和lock.release() # 在進程中需要加鎖的場景# 共享的數據資源(文件、數據庫)# 對資源進行修改、刪除操作 # 加鎖之后能夠保證數據的安全性 但是也降低了程序的執行效率 mport time import json from multiprocessing import Process,Lockdef search_ticket(user):with open('ticket_count') as f:dic = json.load(f)print('%s查詢結果 : %s張余票'%(user,dic['count']))def buy_ticket(user,lock):# with lock:# lock.acquire() # 給這段代碼加上一把鎖time.sleep(0.02)with open('ticket_count') as f:dic = json.load(f)if dic['count'] > 0:print('%s買到票了'%(user))dic['count'] -= 1else:print('%s沒買到票' % (user))time.sleep(0.02)with open('ticket_count','w') as f:json.dump(dic,f)# lock.release() # 給這段代碼解鎖def task(user, lock):search_ticket(user)with lock:buy_ticket(user, lock)if __name__ == '__main__':lock = Lock()for i in range(10):p = Process(target=task,args=('user%s'%i,lock))p.start()

進程之間通信IPC

進程之間的通信 - IPC(inter process communication) 第三方:redis,memcache,kafka,rabbitmq 特點:并發需求,高可用,斷電保存數據,解耦 from multiprocessing import Queue,Process # 先進先出 def func(exp,q):ret = eval(exp)q.put({ret,2,3})q.put(ret*2)q.put(ret*4)if __name__ == '__main__':q = Queue()Process(target=func,args=('1+2+3',q)).start()print(q.get())print(q.get())print(q.get()) # Queue基于 天生就是數據安全的# 文件家族的socket pickle lock # pipe 管道(不安全的) = 文件家族的socket pickle # 隊列 = 管道 + 鎖 # from multiprocessing import Pipe # pip = Pipe() # pip.send() # pip.recv() import queue# from multiprocessing import Queue # q = Queue(5) # q.put(1) # q.put(2) # q.put(3) # q.put(4) # q.put(5) # 當隊列為滿的時候再向隊列中放數據 隊列會阻塞 # print('5555555') # try: # q.put_nowait(6) # 當隊列為滿的時候再向隊列中放數據 會報錯并且會丟失數據 # except queue.Full: # pass # print('6666666') # # print(q.get()) # print(q.get()) # print(q.get()) # 在隊列為空的時候會發生阻塞 # print(q.get()) # 在隊列為空的時候會發生阻塞 # print(q.get()) # 在隊列為空的時候會發生阻塞 # try: # print(q.get_nowait()) # 在隊列為空的時候 直接報錯 # except queue.Empty:pass

生產者消費者模型

什么是生產者消費者模型? # 把一個產生數據并且處理數據的過程解耦 # 讓生產的數據的過程和處理數據的過程達到一個工作效率上的平衡 # 中間的容器,在多進程中我們使用隊列或者可被join的隊列,做到控制數據的量# 當數據過剩的時候,隊列的大小會控制這生產者的行為# 當數據嚴重不足的時候,隊列會控制消費者的行為# 并且我們還可以通過定期檢查隊列中元素的個數來調節生產者消費者的個數

第一種方式:

import time import random from multiprocessing import Process,Queuedef producer(q,name,food):for i in range(10):time.sleep(random.random())fd = '%s%s'%(food,i)q.put(fd)print('%s生產了一個%s'%(name,food))def consumer(q,name):while True:food = q.get()if not food:breaktime.sleep(random.randint(1,3))print('%s吃了%s'%(name,food))def cp(c_count,p_count):q = Queue(10)for i in range(c_count):Process(target=consumer, args=(q, 'alex')).start()p_l = []for i in range(p_count):p1 = Process(target=producer, args=(q, 'wusir', '泔水'))p1.start()p_l.append(p1)for p in p_l:p.join()for i in range(c_count):q.put(None) if __name__ == '__main__':cp(2,3) 流程:消費者開啟進程get,生產者開啟進程put,加入隊列,全部結束后(jion),隊列put(None),消費者get到空終止

第二種方式:

import time import random from multiprocessing import JoinableQueue,Processdef producer(q,name,food):for i in range(10):time.sleep(random.random())fd = '%s%s'%(food,i)q.put(fd)print('%s生產了一個%s'%(name,food))q.join()def consumer(q,name):while True:food = q.get()time.sleep(random.random())print('%s吃了%s'%(name,food))q.task_done()if __name__ == '__main__':jq = JoinableQueue()p =Process(target=producer,args=(jq,'wusir','泔水'))p.start()c = Process(target=consumer,args=(jq,'alex'))c.daemon = Truec.start()p.join()

JoinableQueue同樣通過multiprocessing使用。

創建隊列的另外一個類:

? JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

參數介紹:

? maxsize是隊列中允許最大項數,省略則無大小限制。

方法介紹:

? JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:

? q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大于從隊列中刪除項目的數量,將引發ValueError異常

? q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止

進程間數據共享

from multiprocessing import Manager,Process,Lockdef func(dic,lock):with lock:dic['count'] -= 1if __name__ == '__main__':# m = Manager()with Manager() as m:l = Lock()dic = m.dict({'count':100})p_l = []for i in range(100):p = Process(target=func,args=(dic,l))p.start()p_l.append(p)for p in p_l:p.join()print(dic)

mulprocessing中有一個manager類,封裝了所有和進程相關的 數據共享 數據傳遞相關的數據類型,但是對于 字典 列表這一類的數據操作的時候會產生數據不安全, 需要加鎖解決問題,并且需要盡量少的使用這種方式.

線程

GIL鎖:全局解釋器鎖,cpython解釋器中特殊的垃圾回收機制,導致了在同一個進程中多個線程不能同時利用多核 —— python的多線程只能是并發不能是并行

所以使用所線程并不影響高io型的操作,只會對高計算型的程序由效率上的影響

主線程什么時候結束?等待所有子線程結束之后才結束 主線程如果結束了,主進程也就結束了 # multiprocessing 是完全仿照這threading的類寫的 from threading import Thread def func():print('start son thread') # 啟動線程 start Thread(target=func).start() # 開啟多個子線程 def func(i):print('start son thread',i)time.sleep(1)print('end son thread',i)for i in range(10):Thread(target=func,args=(i,)).start() print('main') # join方法 阻塞 直到子線程執行結束 import time import os from threading import Thread def func(i):print('start son thread',i)time.sleep(1)print('end son thread',i,os.getpid()) t_l = [] for i in range(10):t = Thread(target=func,args=(i,))t.start()t_l.append(t) for t in t_l:t.join() print('子線程執行完畢') # 使用面向對象的方式啟動線程 class MyThread(Thread):def __init__(self,i):self.i = isuper().__init__()def run(self):print('start',self.i,self.ident)time.sleep(1)print('end',self.i)for i in range(10):t = MyThread(i)t.start()print(t.ident) #線程id # 線程里的一些其他方法 from threading import current_thread,enumerate,active_count def func(i):t = current_thread() #當前線程對象print('start son thread',i,t.ident)time.sleep(1)print('end son thread',i,os.getpid())t = Thread(target=func,args=(1,)) t.start() print(t.ident) print(current_thread().ident) # 水性楊花 在哪一個線程里,current_thread()得到的就是這個當前線程的信息 print(enumerate()) #活著的線程列表 print(active_count()) # =====len(enumerate()) terminate 結束進程,在線程中不能從主線程結束一個子線程 # 守護線程 import time from threading import Thread def son1():while True:time.sleep(0.5)print('in son1') def son2():for i in range(5):time.sleep(1)print('in son2') t =Thread(target=son1) t.daemon = True t.start() Thread(target=son2).start() time.sleep(3) # 守護線程一直等到所有的非守護線程都結束之后才結束 # 除了守護了主線程的代碼之外也會守護子線程

線程鎖

即便是線程 即便有GIL 也會出現數據不安全的問題

# 1.操作的是全局變量 # 2.做一下操作# += -= *= /+ 先計算再賦值才容易出現數據不安全的問題# 包括 lst[0] += 1 dic['key']-=1 a = 0 def add_f(lock):global afor i in range(200000):with lock:a += 1def sub_f(lock):global afor i in range(200000):with lock:a -= 1from threading import Thread,Lock lock = Lock() t1 = Thread(target=add_f,args=(lock,)) t1.start() t2 = Thread(target=sub_f,args=(lock,)) t2.start() t1.join() t2.join() print(a)

加鎖會影響程序的執行效率,但是保證了數據的安全

互斥鎖是鎖中的一種:在同一個線程中,不能連續acquire多次

☆帶鎖的單例模式

import time from threading import Lock class A:__instance = Nonelock = Lock()def __new__(cls, *args, **kwargs):with cls.lock:if not cls.__instance:time.sleep(0.1)cls.__instance = super().__new__(cls)return cls.__instancedef __init__(self,name,age):self.name = nameself.age = agedef func():a = A('alex', 84)print(a)from threading import Thread for i in range(10):t = Thread(target=func)t.start()

遞歸鎖

from threading import RLock # rlock = RLock() # rlock.acquire() # print('*'*20) # rlock.acquire() # print('-'*20) # rlock.acquire() # print('*'*20)

優點:在同一個線程中,可以連續acuqire多次不會被鎖住

缺點:占用了更多資源

死鎖現象:在某一些線程中出現陷入阻塞并且永遠無法結束阻塞的情況就是死鎖現象

1.多把鎖+交替使用

2.互斥鎖在一個線程中連續acquire

避免方法:在一個線程中只有一把鎖,并且每一次acquire之后都要release

解決方法:可以用遞歸鎖解決,也可以通過優化代碼邏輯解決.

import time from threading import RLock,Thread # noodle_lock = RLock() # fork_lock = RLock() noodle_lock = fork_lock = RLock() print(noodle_lock,fork_lock) def eat1(name,noodle_lock,fork_lock):noodle_lock.acquire()print('%s搶到面了'%name)fork_lock.acquire()print('%s搶到叉子了' % name)print('%s吃了一口面'%name)time.sleep(0.1)fork_lock.release()print('%s放下叉子了' % name)noodle_lock.release()print('%s放下面了' % name)def eat2(name,noodle_lock,fork_lock):fork_lock.acquire()print('%s搶到叉子了' % name)noodle_lock.acquire()print('%s搶到面了'%name)print('%s吃了一口面'%name)time.sleep(0.1)noodle_lock.release()print('%s放下面了' % name)fork_lock.release()print('%s放下叉子了' % name)lst = ['alex','wusir','taibai','yuan'] Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start() Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start() Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start() Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()

互斥鎖解決

import time from threading import Lock,Thread lock = Lock() def eat1(name,noodle_lock,fork_lock):lock.acquire()print('%s搶到面了'%name)print('%s搶到叉子了' % name)print('%s吃了一口面'%name)time.sleep(0.1)print('%s放下叉子了' % name)print('%s放下面了' % name)lock.release()def eat2(name,noodle_lock,fork_lock):lock.acquire()print('%s搶到叉子了' % name)print('%s搶到面了'%name)print('%s吃了一口面'%name)time.sleep(0.1)print('%s放下面了' % name)print('%s放下叉子了' % name)lock.release()lst = ['alex','wusir','taibai','yuan'] Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start() Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start() Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start() Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()

先進先出隊列

from queue import Queue

后進先出隊列---棧

from queue import LifoQueue

優先級隊列

自動的排序 搶票的用戶級別 100000 100001 告警級別 from queue import PriorityQueue pq = PriorityQueue() pq.put((10,'alex')) pq.put((6,'wusir')) pq.put((20,'yuan')) print(pq.get()) print(pq.get())

預先的開啟固定個數的進程數,當任務來臨的時候,直接提交給已經開好的進程,讓這個進程去執行就可以了,節省了進程,線程的開啟關閉的切換時間,并且減輕了操作系統調度的負擔.

開啟步驟

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor # 創建一個池子 tp = ThreadPoolExecutor(池中線程/進程的個數) # 異步提交任務 ret = tp.submit(函數,參數1,參數2....) # 獲取返回值 ret.result() # 在異步的執行完所有任務之后,主線程/主進程才開始執行的代碼 tp.shutdown() 阻塞 直到所有的任務都執行完畢 # 關閉池之后就不能繼續提交任務,并且會阻塞,直到已經提交的任務完成 # map方法 ret = tp.map(func,iterable) #迭代獲取iterable中的內容,作為func的參數,讓子線程來執行對應的任務 for i in ret: 每一個都是任務的返回值 # 回調函數 ret.add_done_callback(函數名) # 要在ret對應的任務執行完畢之后,直接繼續執行add_done_callback綁定的函數中的內容,并且ret的結果會作為參數返回給綁定的函數

帶參數及返回值

def func(i,name):print('start',os.getpid())time.sleep(random.randint(1,3))print('end', os.getpid())return '%s * %s'%(i,os.getpid()) if __name__ == '__main__':p = ProcessPoolExecutor(5)ret_l = []for i in range(10):ret = p.submit(func,i,'alex')ret_l.append(ret)for ret in ret_l:print('ret-->',ret.result()) # ret.result() 同步阻塞print('main',os.getpid())

回調函數

import requests from concurrent.futures import ThreadPoolExecutor def get_page(url):res = requests.get(url)return {'url':url,'content':res.text}def parserpage(ret): #必須有參數dic = ret.result()print(dic) tp = ThreadPoolExecutor(5) url_lst = ['http://www.baidu.com', # 3'http://www.cnblogs.com', # 1'http://www.douban.com', # 1'http://www.tencent.com','http://www.cnblogs.com/Eva-J/articles/8306047.html','http://www.cnblogs.com/Eva-J/articles/7206498.html', ] ret_l = [] for url in url_lst:ret = tp.submit(get_page,url)ret_l.append(ret)ret.add_done_callback(parserpage) 回調函數add_done_callback# 執行完子線程任務之后直接調用對應的回調函數# 爬取網頁 需要等待數據傳輸和網絡上的響應高IO的 -- 子線程# 分析網頁 沒有什么IO操作 -- 這個操作沒必要在子線程完成,交給回調函數 是單獨開啟線程進程還是池?# 如果只是開啟一個子線程做一件事情,就可以單獨開線程# 有大量的任務等待程序去做,要達到一定的并發數,開啟線程池# 根據你程序的io操作也可以判定是用池還是不用池?# socket的server 大量的阻塞io recv recvfrom socketserver# 爬蟲的時候 池

池的總結

hreadPoolExecutor中的幾個常用方法# tp = ThreadPoolExecutor(cpu*5)# obj = tp.submit(需要在子線程執行的函數名,參數)# obj# 1.獲取返回值 obj.result() 是一個阻塞方法# 2.綁定回調函數 obj.add_done_callback(子線程執行完畢之后要執行的代碼對應的函數)# ret = tp.map(需要在子線程執行的函數名,iterable)# 1.迭代ret,總是能得到所有的返回值# shutdown# tp.shutdown()

進程和線程中的鎖

# 所有在線程中能工作的基本都不能在進程中工作 # 在進程中能夠使用的基本在線程中也可以使用

在多進程中啟動多線程

在多進程里啟動多線程 import os from multiprocessing import Process from threading import Threaddef tfunc():print(os.getpid()) def pfunc():print('pfunc-->',os.getpid())Thread(target=tfunc).start()if __name__ == '__main__':Process(target=pfunc).start()

協程

# 協程:# 用戶級別的,由我們自己寫的python代碼來控制切換的# 是操作系統不可見的 # 在Cpython解釋器下 - 協程和線程都不能利用多核,都是在一個CPU上輪流執行# 由于多線程本身就不能利用多核# 所以即便是開啟了多個線程也只能輪流在一個CPU上執行# 協程如果把所有任務的IO操作都規避掉,只剩下需要使用CPU的操作# 就意味著協程就可以做到題高CPU利用率的效果 # 多線程和協程# 線程 切換需要操作系統,開銷大,操作系統不可控,給操作系統的壓力大# 操作系統對IO操作的感知更加靈敏# 協程 切換需要python代碼,開銷小,用戶操作可控,完全不會增加操作系統的壓力# 用戶級別能夠對IO操作的感知比較低

gevent模塊開啟協程

import time print('-->',time.sleep) import gevent from gevent import monkey monkey.patch_all() def eat():print('wusir is eating')print('in eat: ')time.sleep(1) #遇到阻塞讓出CPUreturn 'wusir finished eat'def sleep():print('小馬哥 is sleeping')time.sleep(1)print('小馬哥 finished sleep') g_l=[] for i in range(10): # 創造十個協程任務g1 = gevent.spawn(eat) g_l.append(g1) g2 = gevent.spawn(sleep) # 創造一個協程任務 g2.join() # 阻塞 直到g1任務完成為止 gevent.joinall(g_l) #jionall后面加包含gevent對象的列表 for i in g_l:print(i.value) #value取值

asyncio模塊

# 起一個任務 # async def demo(): # 協程方法 # print('start') # await asyncio.sleep(1) # 阻塞 # print('end') # # loop = asyncio.get_event_loop() # 創建一個事件循環 # loop.run_until_complete(demo()) # 把demo任務丟到事件循環中去執行# 啟動多個任務,并且沒有返回值 # async def demo(): # 協程方法 # print('start') # await asyncio.sleep(1) # 阻塞 # print('end') # # loop = asyncio.get_event_loop() # 創建一個事件循環 # wait_obj = asyncio.wait([demo(),demo(),demo()]) # loop.run_until_complete(wait_obj)# 啟動多個任務并且有返回值 # async def demo(): # 協程方法 # print('start') # await asyncio.sleep(1) # 阻塞 # print('end') # return 123 # # loop = asyncio.get_event_loop() # t1 = loop.create_task(demo()) # t2 = loop.create_task(demo()) # tasks = [t1,t2] # wait_obj = asyncio.wait([t1,t2]) # loop.run_until_complete(wait_obj) # for t in tasks: # print(t.result())# 誰先回來先取誰的結果 # import asyncio # async def demo(i): # 協程方法 # print('start') # await asyncio.sleep(10-i) # 阻塞 # print('end') # return i,123 # # async def main(): # task_l = [] # for i in range(10): # task = asyncio.ensure_future(demo(i)) # task_l.append(task) # for ret in asyncio.as_completed(task_l): # res = await ret # print(res) # # loop = asyncio.get_event_loop() # loop.run_until_complete(main())# import asyncio # # async def get_url(): # reader,writer = await asyncio.open_connection('www.baidu.com',80) # writer.write(b'GET / HTTP/1.1\r\nHOST:www.baidu.com\r\nConnection:close\r\n\r\n') # all_lines = [] # async for line in reader: # data = line.decode() # all_lines.append(data) # html = '\n'.join(all_lines) # return html # # async def main(): # tasks = [] # for url in range(20): # tasks.append(asyncio.ensure_future(get_url())) # for res in asyncio.as_completed(tasks): # result = await res # print(result) # # # if __name__ == '__main__': # loop = asyncio.get_event_loop() # loop.run_until_complete(main()) # 處理一個任務

轉載于:https://www.cnblogs.com/machangwei-8/p/10885890.html

總結

以上是生活随笔為你收集整理的并发编程.md的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。