python学习并发编程
生活随笔
收集整理的這篇文章主要介紹了
python学习并发编程
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一、并發編程理論基礎
- 并發編程得應用:
- 網絡應用:爬蟲(直接應用并發編程)
- 網絡架構django、flask、tornado 源碼-并發編程
- socket server 源碼-并發編程
- 計算機操作系統發展史
- 手工操作-讀穿孔的紙帶、用戶獨占全機、cpu等待手工操作、cpu利用不充分
- 批處理-磁帶存儲(聯機批處理{一邊寫入磁帶,一邊從磁帶讀出計算}、脫機批處理、只能運行一個程序,遇到IO就等待閑置
- 多道程序系統
- 同時執行多個任務,遇到io就切換,
- 即空間隔離(多個程序運行),時空復用(IO切換)的特點
- 分時系統
- 同時執行多個任務,沒有遇到IO也切換,固定時間片到了就切
- 時間片輪轉
- 多路性、交互性、獨立性、及時性
- 切會浪費cpu時間,降低了cpu效率,但提高了用戶體驗。
- 實時系統
- 及時響應
- 高可靠性
- 時刻等待響應,即時處理
- 通用操作系統:
- 多道批處理
- 分時
- 實時
- 操作系統分類:
- 個人計算機操作系統
- 網絡操作系統
- 分布式操作系統
- 操作系統的作用:
- 是一個協調、管理、和控制計算機硬件資源和軟件資源的控制程序。【調度硬件資源、調用管理軟件】
- 隱藏了丑陋的硬件調用接口,提供良好的抽象接口
- 管理、調度進程,并將多個進程對硬件的競爭變得有序
- I/O操作:(相對內存來說得)
- 輸入【讀到內存中】: input、f.read、accept、recv、connect
- 輸出【從內存讀出】:print、f.write、send、connect
- 文件操作/網絡操作都是IO操作
- 異步:兩個任務同時運行(并行)
- 同步:多個任務 串行 【按順序執行】
- 阻塞:等待 input、accept、recv
- 非阻塞:不等待,直接執行
- 并行:多個任務同時執行【多個CPU在同時執行任務】
- 并發:只有一個CPU,交替執行多個任務【宏觀上同時執行,實際是輪轉】
- https://www.bughui.com/2017/08/23/difference-between-concurrency-and-parallelism/
- 程序:沒有運行
- 進程:運行中得程序、計算機中最小得資源分配單元
- 進程調度:
- 多個進程(運行中得程序)在操作系統得控制下被CPU執行,去享用計算機資源
- 先來先服務調度算法
- 短作業優先
- 時間片輪轉
- 多級反饋隊列
- 進程調度得過程是不能隨意被程序影響得
- 進程三大狀態
- 就緒 程序開始運行,進入就緒隊列,等待cpu分配時間
- 運行 執行進程,單個時間片內運行完成,就釋放資源,沒有左右完,就又自動切換到就緒隊列等待下次得調度
- 阻塞 執行中得進程遇到事件入IO等導致無法執行,進入阻塞狀態,解除后進入就緒隊列等待進程調度
- 進程與父進程
- 進程 PID? ? 通過os.getpid()? 可以得到
- 父進程 PPID 負責回收一些子進程得資源后才關閉? ?通過os.getppid()
二、python中的并發編程-進程
- 包:multiprocessing
- 是python中操作、管理進程
- 創建進程、進程同步、進程池、進程之間數據共享
- python中的進程創建
- 對Windows來說創建子進程,是將主進程內存空間中的所有變量import子進程內存空間中,(import就會執行文件代碼)
- windows平臺創建子進程必須放在 if __name__ == '__main__' 下執行(不然就會循環創建子進程導致系統崩潰)
- 對Linux來說創建子進程,就是將主進程內存空間中所有復制過去,復制不會執行內存空間中的代碼
- Linux平臺就無所謂了,反正只是復制
-
創建進程:異步運行
- multiprocess.Process模塊
- Process() 由該類實例化得到得對象,表示一個子進程中得任務(尚未啟動)
- p = Process(group,target= func,agrs=('name',),kwargs={},name=)
- group 值默認始終為None
- target 表示調用對象
- args 調用對象得位置參數 元組!!!
- kwargs 調用對象得字典
- name為子進程得名稱
- 注:必須用關鍵字方式指定參數、位置參數必須是元組格式,那怕是一個參數也要寫成(n,)
- 方法:
- p.start() : 啟動進程,調用操作系統的命令,傳達要創建進程的申請,并調用子進程中的p.run()方法
- p.run(): 進程啟動時運行的方法,其是真正去調用target指定的函數,自定義類,就需要自己定義run方法
- p.terminate(): 強制終止進程p
- p.is_alive(): 查看進程狀態值,True表示運行中
- p.join(): 主線程等待子進程p運行結束后才運行 阻塞
- 實操:
- 復制代碼1 import os2 import time3 from multiprocessing import Process4 5 6 def f(name):7 print('in f', os.getpid(), os.getppid())8 print('i am is son process', name)9 10 11 if __name__ == '__main__': 12 p = Process(target=f, args=('小青',)) 13 p.start() # start不是運行一個程序,而是調用操作系統的命令,要創建子進程 14 15 print('我是主程序……')》》》結果: 我是主程序…… in f 6016 12312 i am is son process 小青結論:在另一個地方開辟內存空間,執行f函數,主進程不會阻塞等待 創建子進程
-
- 給子進程傳參:Process(target= func ,args = (參1,))必須是元組結構!!!
- 1 def f(args): 2 print('in func 2', args, os.getpid(), os.getppid()) 3 4 5 if __name__ == '__main__': 6 print('我在主進程中……') 7 p1 = Process(target=f, args=(666,)) 8 p1.start() 9 p2 = Process(target=f, args=(777,)) 10 p2.start() 11 print('我會在子進程前運行……,因為我和他們是隔離的,不會等他們') 12 13 ''' 14 結果: 15 我在主進程中…… 16 我會在子進程前運行……,因為我和他們是隔離的,不會等他們 17 in func 2 666 8144 7660 18 in func 2 777 10820 7660 19 ''' 代碼演練
- 創建多個子進程:start()方法不是執行方法,而是創建子進程函數,真正執行任務函數是run()方法
- 1 def fu(num): 2 print("我是進程 :%d " % num) 3 4 5 if __name__ == '__main__': 6 print('in main', os.getpid(), os.getppid()) 7 for i in range(10): 8 Process(target=fu, args=(i,)).start() 9 print('main 66666') 10 """ 11 結果: 12 in main 11984 2064 13 main 66666 14 我是進程 :1 15 我是進程 :0 16 我是進程 :8 17 我是進程 :7 18 我是進程 :4 19 我是進程 :2 20 我是進程 :9 21 我是進程 :3 22 我是進程 :6 23 我是進程 :5 24 p.start()并沒有立即執行,而是進入就緒隊列,等帶cpu調度,所以不是有序的 25 """ 代碼演練
- join方法:阻塞、等待子進程執行完畢后再執行后面的代碼
- 1 def g(args): 2 print("in g", args) 3 4 5 if __name__ == '__main__': 6 print('in main') 7 p = Process(target=g, args=(888,)) 8 p.start() 9 p.join() 10 print('i am in main process') 11 ''' 12 in main 13 in g 888 14 i am in main process 15 結:join總是等子進程執行完畢后再執行接下來的代碼 16 ''' 代碼演練
- 多進程中運用join方法:必須保證所有子進程結束,而操作系統創建子進程順序是不定的,故需遍歷每個子進程,每個都進行join一下
- 1 def pro(i): 2 print('in func', i, os.getpid(), os.getppid()) 3 4 5 if __name__ == '__main__': 6 print('in main', os.getpid(),os.getppid()) 7 p_list = [] 8 for i in range(10): 9 p = Process(target=pro, args=(i,)) 10 p.start() # 不是運行一個程序,而是調用操作系統命令,要創建子進程,等待操作系統作業,非阻塞 11 p_list.append(p) 12 print(p_list) 13 for p in p_list: # 遍歷每個子進程,每個join一下,如果該子進程已經接收,join失效相當于pass,遍歷完成就能保證每個子進程都結束了 14 p.join() # 阻塞,直到p這個子進程執行完畢之后再繼續執行 15 print('主進程……') 16 ''' 17 in main 1480 2064 18 [<Process(Process-1, started)>, <Process(Process-2, started)>, <Process(Process-3, started)>, <Process(Process-4, started)>, <Process(Process-5, started)>, <Process(Process-6, started)>, <Process(Process-7, started)>, <Process(Process-8, started)>, <Process(Process-9, started)>, <Process(Process-10, started)>] 19 in func 3 6108 1480 20 in func 7 13756 1480 21 in func 5 12548 1480 22 in func 8 12116 1480 23 in func 4 10948 1480 24 in func 6 11744 1480 25 in func 9 11244 1480 26 in func 1 3968 1480 27 in func 2 9412 1480 28 in func 0 14024 1480 29 主進程…… 30 ''' 代碼演練
- p.is_alive() 和 p.terminate() :查看子進程生命狀態及強制結束子進程(terminate是非阻塞,并不會等待子進程徹底結束才執行后面的代碼,只是發一信息要終結,就不管了)
- 1 # p.is_alive方法:查看子進程生存狀態 2 # p.terminate() 強制結束子進程--非阻塞 3 def gro(i): 4 time.sleep(1) 5 print('in func', i, os.getpid(), os.getppid()) 6 7 8 if __name__ == '__main__': 9 print("in main") 10 p1 = Process(target=gro, args=(1,)) 11 p1.start() 12 # time.sleep(2) # 如果等待一會兒,就會執行函數,如果不等,就不管操作系統去建子進程,而直接執行后面的代碼,所以可能比創建子進程前就執行了 13 print(p1.is_alive()) # 檢測子進程是否還在執行任務 14 p1.terminate() # 強制結束子進程,非阻塞,不會等待狀態改變,會馬上執行后面代碼 15 print(p1.is_alive()) 16 print('主進程的代碼執行結束了……') 17 ''' 18 in main 19 True 20 True 21 主進程的代碼執行結束了…… 22 23 結:因為直接執行,主進程執行快些,子進程函數不會執行 24 ''' 代碼演練
- 通過面向對象的方法創建子進程(重點:重寫run方法,繼承Process類,繼承父類__init__方法)
- 1 class Myprocess(Process): 2 def __init__(self, name): 3 super().__init__(self) # 需繼承父類的init方法 4 self.name = name # 添加需要自己的屬性 5 6 def run(self): 7 print(self.name) # 只有重寫run方法才能將參數傳入 8 print(os.getppid(), os.getpid()) 9 10 11 if __name__ == '__main__': 12 p = Myprocess('小強') 13 p.start() 代碼演練?
- 進程與進程之間內存中的數據是隔離的!!!
- 進程與進程之間是不能自由的交換內存數據的【內存空間是不能共享的】
- 全局的變量在子進程中修改,其他進程是感知不到的【子進程的執行結果父進程是獲取不到的】
- 進程與進程之間想要同行,必須借用其他手段,且兩個進程都是自愿的【父子進程通信是通過socket】
- 1 from multiprocessing import Process 2 3 n = 100 4 5 6 def func(): 7 global n 8 n = n - 1 9 return 111 10 11 12 if __name__ == '__main__': 13 n_l = [] 14 for i in range(100): 15 p = Process(target=func) 16 p.start() 17 n_l.append(p) 18 for p in n_l: p.join() 19 print(n) 20 21 結果為:100 22 23 總結:說明子進程無法改變主進程的全局變量,本質是無法自由通信,但子進程中的n肯定減少了,只是沒法拿出來 代碼演練
- 守護進程? p.daemon = True
- 特點:守護進程的生命周期只和主進程的代碼有關系,和其他子進程沒有關系,會隨著主進程結束而結束
- 作用:報活,監控主進程生命狀態
- 主進程創建守護進程
- 守護進程會在主進程代碼結束后就終止
- 守護進程內無法再開子進程,否咋拋出異常 AssertionError: daemonic processes are not allowed to have children
- 守護進程的屬性,默認是False,如果設置程True,就表示設置這個子進程為一個守護進程
- 設置守護進程的操作應該在開子進程之前即p.start()之前
- 1 from multiprocessing import Process 2 import time 3 def func1(): 4 print('begin') 5 time.sleep(3) 6 print('wawww') 7 8 # if __name__ == '__main__': 9 # p = Process(target=func1) 10 # # p.daemon = True 11 # p.start() 12 # time.sleep(1) 13 # print('in main') 14 ''' 15 結果: 16 begin 17 in main 18 19 結論:守護進程隨著主進程結束而結束,那怕守護進程任務沒有執行完畢 20 ''' 21 22 def f1(): 23 print('begin fun1') 24 time.sleep(3) 25 print('baidu') 26 27 def f2(): 28 while True: 29 print('in f2') 30 time.sleep(0.5) 31 32 if __name__ == '__main__': 33 Process(target=f1,).start() 34 p = Process(target=f2) 35 p.daemon = True 36 # 守護進程的屬性,默認是False,如果設置成True,就表示設置這個子進程為一個守護進程 37 # 設置守護進程的操作應該在開啟子進程之前 38 p.start() 39 time.sleep(1) 40 print('in main') # 主進程in main執行完后,守護進程就會結束,但主進程并沒有結束而是等另一個子進程結束后才結束 41 42 43 # 設置成守護進程之后 會有什么效果呢? 44 # 守護進程會在主進程的代碼執行完畢之后直接結束,無論守護進程是否執行完畢 45 46 # 應用 47 # 報活 主進程還活著 48 # 100臺機器 100個進程 10000進程 49 # 應用是否在正常工作 - 任務管理器來查看 50 # 守護進程如何向監測機制報活???send/寫數據庫 51 # 為什么要用守護進程來報活呢?為什么不用主進程來工作呢??? 52 # 守護進程報活幾乎不占用CPU,也不需要操作系統去調度 53 # 主進程不能嚴格的每60s就發送一條信息 代碼演練
-
進程同步控制
- 進程的同步控制 - 進程之間有一些簡單的信號傳遞,但是用戶不能感知,且用戶不能傳遞自己想傳遞的內容
- 鎖:multiprocessing.Lock? ?*********? 【互斥鎖】
- lock = Lock()? ? ? ? ? ? # 創造一把鎖
- lock.acquire()? ? ? ? ? ?# 獲取了這把鎖的鑰匙、阻塞,鎖未換就一直阻塞著
- lock.release()? ? ? ? ? ?# 歸還這把鑰匙
- 解決多個進程共享一段數據的時候,數據會出現不安全的現象,通過加鎖來維護數據的安全性,同一刻,只允許一個線程修改數據
- 1 import json
2 import time
3 from multiprocessing import Lock
4 from multiprocessing import Process
5
6 # 鎖
7 lock = Lock() # 創造了一把鎖
8 lock.acquire() # 獲取了這把鎖的鑰匙
9 lock.release() # 歸還這把鑰匙,其他進程就可以拿鎖了
10
11
12 # 搶票的故事
13 # 需求:每個人都能查看余票、買相同車次票同一刻只能一人買完,另一人才能買
14
15
16 def search(i):
17 with open('db', encoding='utf-8') as f:
18 count_dic = json.load(f)
19 time.sleep(0.2) # 模擬網絡延遲
20 print('person %s 余票:%s 張' % (i, count_dic.get('count')))
21 return count_dic.get('count'), count_dic # 返回余票數,及字典
22
23
24 def buy(i):
25 count, count_dict = search(i)
26 if count > 0:
27 count_dict['count'] -= 1 # 有票就可以買
28 print('person %s 買票成功'% i)
29 time.sleep(2)
30 with open('db', 'w', encoding='utf-8') as f:
31 json.dump(count_dict, f) # 更改余票額度
32
33
34 def task(i, lock):
35 search(i)
36 lock.acquire() # 如果之前已經被acquire了 且 沒有被release 那么進程會在這里阻塞
37 buy(i)
38 lock.release()
39
40
41 if __name__ == '__main__':
42 lock = Lock()
43 for i in range(1, 11):
44 Process(target=task, args=(i, lock)).start()
45
46 # 當多個進程共享一段數據的時候,數據會出現不安全的現象,
47 # 需要加鎖來維護數據的安全性
48
49 '''
50 D:\install\Python36\python.exe D:/install/project/7、并發編程/3、鎖.py
51 person 6 余票:5 張
52 person 5 余票:5 張
53 person 8 余票:5 張
54 person 2 余票:5 張
55 person 4 余票:5 張
56 person 10 余票:5 張
57 person 1 余票:5 張
58 person 9 余票:5 張
59 person 3 余票:5 張
60 person 7 余票:5 張
61 person 6 余票:5 張
62 person 6 買票成功
63 person 5 余票:4 張
64 person 5 買票成功
65 person 8 余票:3 張
66 person 8 買票成功
67 person 2 余票:2 張
68 person 2 買票成功
69 person 4 余票:1 張
70 person 4 買票成功
71 person 10 余票:0 張
72 person 1 余票:0 張
73 person 9 余票:0 張
74 person 3 余票:0 張
75 person 7 余票:0 張
76
77 Process finished with exit code 0
78 ''' 代碼演練-搶票的故事
注:進程間的數據交互,本質也用到了socket通信,不過都是本地的,基于文件的,可以通過將py名寫成socket來看報錯得知。
- 1 #加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 2 雖然可以用文件共享數據實現進程間通信,但問題是: 3 1.效率低(共享數據基于文件,而文件是硬盤上的數據) 4 2.需要自己加鎖處理 5 6 #因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機制:隊列和管道。 7 隊列和管道都是將數據存放于內存中 8 隊列又是基于(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來, 9 我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。
- 信號量:multiprocessing.Semaphore
- 資源有限,同時允許一定數量的進程訪問修改數據,即一把鎖對應多把鑰匙
- 本質:鎖+ 計數器??
- 1 互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。 2 假設商場里有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。 3 實現: 4 信號量同步基于內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用于訪問像服務器這樣的有限資源。 5 信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念 6 7 8 import time, random 9 from multiprocessing import Semaphore, Process 10 11 # ktv 只有4個房間,即同時只能四個人進去,其他人必須等其中的人出來才能進去 12 # 13 # sem = Semaphore(4) # 設置信號量個數,并發數 14 # sem.acquire() 15 # print('進去1個人,關門阻塞中') 16 # sem.acquire() 17 # print('進去第2個人,關門阻塞中') 18 # sem.acquire() 19 # print('進去第3個人,關門阻塞中') 20 # sem.acquire() 21 # print('進去第4個人,關門阻塞中') 22 # sem.release() # 必須歸還一把,才能繼續下面的代碼,不然一直阻塞中 23 # sem.acquire() 24 # print(6666) 25 # sem.release() 26 ''' 27 D:\install\Python36\python.exe D:/install/project/7、并發編程/4、信號量.py 28 進去1個人,關門阻塞中 29 進去第2個人,關門阻塞中 30 進去第3個人,關門阻塞中 31 進去第4個人,關門阻塞中 32 6666 33 34 Process finished with exit code 0 35 ''' 36 37 38 def ktv(num, sem): 39 sem.acquire() 40 print('person %s 進入了ktv' % num) 41 time.sleep(random.randint(1, 4)) 42 print('person %s 進出了ktv' % num) 43 sem.release() 44 45 46 if __name__ == '__main__': 47 sem = Semaphore(4) 48 for i in range(10): 49 Process(target=ktv, args=(i, sem)).start() 50 51 ''' 52 最開始是4個同時進入,之后又人出,才能有人進 53 D:\install\Python36\python.exe D:/install/project/7、并發編程/4、信號量.py 54 person 2 進入了ktv 55 person 8 進入了ktv 56 person 9 進入了ktv 57 person 7 進入了ktv 58 person 2 進出了ktv 59 person 6 進入了ktv 60 person 7 進出了ktv 61 person 5 進入了ktv 62 person 8 進出了ktv 63 person 1 進入了ktv 64 person 9 進出了ktv 65 person 0 進入了ktv 66 person 1 進出了ktv 67 person 4 進入了ktv 68 person 6 進出了ktv 69 person 3 進入了ktv 70 person 0 進出了ktv 71 person 5 進出了ktv 72 person 4 進出了ktv 73 person 3 進出了ktv 74 75 Process finished with exit code 0 76 ''' 代碼演練-kvt的故事
-
事件:multiprocessing.Event
-
定義:全局定義了一個‘flag’,如果標志為False,當程序執行event.wait方法時就會阻塞,如果為True,那么event.wait方法時便不再阻塞。
- 作用:主進程控制其他線程的執行,實現兩個或多個線程間的交互
- 使用:
- 事件創立之初,默認是Faslse,即阻塞狀態
- e = Event()? ? ? ? ? ? # 創建事件,默認False
- print(e.is_set())? ? ?#? 查看狀態
- e.set()? ? ? ? ? ? ? ? ? ?#? ?將標志設置為True
- e.clear()? ? ? ? ? ? ? ? #? ?將標志設置為False
- e.wait()? ? ? ? ? ? ? ? ?#? ? 等待,當標志為False,那么阻塞,當標志為True,那么非阻塞,wait什么也不做直接pass
- e.wait(timeout=10)? ? ?# 如果信號在阻塞10s之內變為True,那么就不繼續阻塞直接pass,如果就阻塞10s之后狀態還是沒變,那么繼續阻塞
- 1 from multiprocessing import Process, Event 2 import time, random 3 4 5 def car(e, n): 6 while True: 7 if not e.is_set(): # 進程剛開啟,is_set()的值是Flase,模擬信號燈為紅色 8 print('\033[31m紅燈亮\033[0m,car%s等著' % n) 9 e.wait() # 阻塞,等待is_set()的值變成True,模擬信號燈為綠色 10 print('\033[32m車%s 看見綠燈亮了\033[0m' % n) 11 time.sleep(random.randint(3, 6)) 12 if not e.is_set(): #如果is_set()的值是Flase,也就是紅燈,仍然回到while語句開始 13 continue 14 print('車開遠了,car', n) 15 break 16 17 18 def police_car(e, n): 19 while True: 20 if not e.is_set():# 進程剛開啟,is_set()的值是Flase,模擬信號燈為紅色 21 print('\033[31m紅燈亮\033[0m,car%s等著' % n) 22 e.wait(0.1) # 阻塞,等待設置等待時間,等待0.1s之后沒有等到綠燈就闖紅燈走了 23 if not e.is_set(): 24 print('\033[33m紅燈,警車先走\033[0m,car %s' % n) 25 else: 26 print('\033[33;46m綠燈,警車走\033[0m,car %s' % n) 27 break 28 29 30 31 def traffic_lights(e, inverval): 32 while True: 33 time.sleep(inverval) 34 if e.is_set(): 35 print('######', e.is_set()) 36 e.clear() # ---->將is_set()的值設置為False 37 else: 38 e.set() # ---->將is_set()的值設置為True 39 print('***********',e.is_set()) 40 41 42 if __name__ == '__main__': 43 e = Event() 44 for i in range(10): 45 p=Process(target=car,args=(e,i,)) # 創建是個進程控制10輛車 46 p.start() 47 48 for i in range(5): 49 p = Process(target=police_car, args=(e, i,)) # 創建5個進程控制5輛警車 50 p.start() 51 t = Process(target=traffic_lights, args=(e, 10)) # 創建一個進程控制紅綠燈 52 t.start() 53 54 print('============》') 代碼演練-紅綠燈的故事
-
進程間通信-隊列和管道
- IPC 進程之間的通信 multiprocessing.Queue/Pipe:進程之間的內存是不共享的,隔離的,但隊列、管道有使之通信的功能。
- 隊列 multiprocessing.Queue
- from queue import Queue # 隊列 先進先出FIFO,有序 # 應用:維護秩序的時候用的比較多,買票,搶票 q = Queue(5) # 設置隊列的長度,即元素個數,即只能放入5個元素 ret = q.qsize() # 獲得當前隊列中的元素個數,此方法不準,在多進程中,此刻獲取結果時,也許其他進程向里面加入了元素 q.put(1111) # 向隊列中放入對象,如果隊列已滿,則阻塞等待,一直到空間可用為止 ''' 參數: item:項目、元素、對象 block:默認True,隊列滿一直等待阻塞,False則為不阻塞,滿則直接主動自定義報錯 timeout:阻塞等待的時間,時間到了,還不能放,則報錯Queue.Empty異常 ''' q.put_nowait(2222) # 放入元素,滿了,不等直接報錯 q.get() # 返回q即隊列中的元素,隊列中為空,則阻塞一直等待有值為止,通向可設置timeout q.get_nowait() # 隊列為空時,直接報錯 q.empty() # 判斷是否為空,空則返回True,同樣在多進程中不準 q.full() # 判斷是否滿了,滿了則返回True,多進程中不準,主要是進程是異步操作 數據類型中的隊列及隊列中的方法,在進程、線程中通用
- 定義:先進先出、有序
- 本質:管道 + 鎖
- 作用:由于先進先出的特點+進程通信的功能+數據進程安全,經常用它來完成進程之間的通信
- # 例子1 一進程放 一進程取 from multiprocessing import Queue, Processdef con(q):print(q.get()) # 從隊列中拿,沒有直到等到有,所以那么它比其他進程快,最后也能拿到數據def pro(q):q.put(112) # 向隊列中放入112if __name__ == '__main__':q = Queue()p = Process(target=con, args=(q,))p.start()p = Process(target=pro, args=(q,))p.start()print('我在主進程中……')''' 看出隊列可以實現進程間的通信 ''' # 主放, 子取 from multiprocessing import Queue, Process def f(q):print(q.get())if __name__ == '__main__':q = Queue()Process(target=f, args=(q,)).start() # create son_processq.put(666) '''看出主進程可和子進程通信''' 代碼演練-主子通信/子子通信
- 生成者消費者模型:可解決大部分并發問題
- 并發中的問題:
- 生產數據快,消費數據慢,內存空間的浪費,【產生的數據不能丟只能放在內存中等著處理】
- 生產數據慢,消費數據快,效率低下 【總是要等著生產數據,啥也干不了】
- 作用:- 解決創造(生成)數據和處理(消費)數據的效率不平衡問題
- 實現:將創造數據和處理數據放在不同的進程中,根據他們的效率來調整進程的個數
- 1 import time 2 import random 3 from multiprocessing import Process, Queue 4 5 6 def consumer(q,name): 7 while 1: 8 food = q.get() 9 print('%s 吃了 %s ' % (name, food)) 10 time.sleep(random.random()) 11 12 13 def producer(q,name,food,n=10): 14 for i in range(1, n): # 定義生產10個食物 15 time.sleep(random.random()) # 模擬生產慢,消費快 16 fd = food + str(i) 17 print('%s 生產了 %s' %(name,fd)) 18 q.put(fd) 19 20 if __name__ == '__main__': 21 q = Queue(10) 22 for person in range(6): # 定義消費者多 23 Process(target=consumer, args=(q, 'person'+ str(person))).start() 24 Process(target=producer, args=(q,'小強','米飯')).start() 25 Process(target=producer, args=(q,'小東','面條')).start() 26 Process(target=producer, args=(q,'小hua','面條')).start() 27 Process(target=producer, args=(q,'小cai','面條')).start() 代碼演練-生產者生產完了結束,而消費者一直在get阻塞中,待解決
- 問題: 消費者一直在等著拿數據,生產者生產完了就結束了,生產者需要告訴消費者生產完了才合理,即向隊列中放入stop信號
- 重點:消費者有多少個,就必須要發多少個stop信號
- 解決:
- 方案一:生產者子進程中發,缺陷:但生產者數必須和消費者數一樣,消費者進程才能全部關閉
- 1 def consumer(q,name): 2 while 1: 3 food = q.get() 4 if food == 'stop':break 5 print('%s 吃了 %s ' % (name, food)) 6 time.sleep(random.random()) 7 8 9 def producer(q,name,food,n=10): 10 for i in range(1, n): # 定義生產10個食物 11 time.sleep(random.random()) # 模擬生產慢,消費快 12 fd = food + str(i) 13 print('%s 生產了 %s' %(name,fd)) 14 q.put(fd) 15 q.put('stop') 16 17 if __name__ == '__main__': 18 q = Queue(10) 19 for person in range(4): # 定義消費者多 20 Process(target=consumer, args=(q, 'person'+ str(person))).start() 21 Process(target=producer, args=(q,'小強','米飯')).start() 22 Process(target=producer, args=(q,'小東','面條')).start() 23 Process(target=producer, args=(q,'小hua','面條')).start() 24 Process(target=producer, args=(q,'小cai','面條')).start() 25 26 # 生產者加入結束信號,消費者收到后就結束,不存在還有進程在使用數據,因為隊列是先進先出的 27 28 # 且有多少個消費者就需要發多少個stop信號,不然就會導致有的的進程還在等待中 代碼演練
- 方案二:主進程中發,但必須要等生產者都結束,那么就加入p.join() ,還是得有多少個消費者,發多少個stop信號,無法自動
- 1 from multiprocessing import Queue, Process 2 import random, time 3 4 5 def consumer(q, name): 6 while 1: 7 food = q.get() 8 if food == 'stop': break 9 print('%s 吃了 %s ' % (name, food)) 10 time.sleep(random.random()) 11 12 13 def producer(q, name, food, n=10): 14 for i in range(1, n): # 定義生產10個食物 15 time.sleep(random.random()) # 模擬生產慢,消費快 16 fd = food + str(i) 17 print('%s 生產了 %s' % (name, fd)) 18 q.put(fd) 19 20 21 if __name__ == '__main__': 22 q = Queue(10) 23 for person in range(4): # 定義消費者4個 24 Process(target=consumer, args=(q, 'person'+ str(person))).start() 25 p1 = Process(target=producer, args=(q,'小強','米飯')) 26 p1.start() 27 p2 = Process(target=producer, args=(q,'小東','面條')) 28 p2.start() 29 p1.join() # 保證p生產者結束 30 p2.join() 31 q.put('stop') # 必須得發四個stop信號 32 q.put('stop') 33 q.put('stop') 34 q.put('stop') 代碼演練
- ? ? ? 方案三:
- JoinableQueue([maxsize])
創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。? - 1 JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法: 2 3 q.task_done() 4 使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大于從隊列中刪除的項目數量,將引發ValueError異常。 5 6 q.join() 7 生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。 8 下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,并等待它們被處理。 JoinableQueue使用方法 1 from multiprocessing import Process,JoinableQueue 2 import time,random,os 3 def consumer(q): 4 while True: 5 res=q.get() 6 time.sleep(random.randint(1,3)) 7 print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) 8 q.task_done() #向q.join()發送一次信號,證明一個數據已經被取走了 9 10 def producer(name,q): 11 for i in range(10): 12 time.sleep(random.randint(1,3)) 13 res='%s%s' %(name,i) 14 q.put(res) 15 print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) 16 q.join() #生產完畢,使用此方法進行阻塞,直到隊列中所有項目均被處理。 17 18 19 if __name__ == '__main__': 20 q=JoinableQueue() 21 #生產者們:即廚師們 22 p1=Process(target=producer,args=('包子',q)) 23 p2=Process(target=producer,args=('骨頭',q)) 24 p3=Process(target=producer,args=('泔水',q)) 25 26 #消費者們:即吃貨們 27 c1=Process(target=consumer,args=(q,)) 28 c2=Process(target=consumer,args=(q,)) 29 c1.daemon=True 30 c2.daemon=True 31 32 #開始 33 p_l=[p1,p2,p3,c1,c2] 34 for p in p_l: 35 p.start() 36 37 p1.join() 38 p2.join() 39 p3.join() 40 print('主') 41 42 #主進程等--->p1,p2,p3等---->c1,c2 43 #p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據 44 #因而c1,c2也沒有存在的價值了,不需要繼續阻塞在進程中影響主進程了。應該隨著主進程的結束而結束,所以設置成守護進程就可以了。 代碼演練
- 消費者每消費處理完一個數據,就向生產者發送消息,通過q.task_done
- 生產者每生產一個數據就在計數器+1,每接到消費者發得q.task_done,就-1,消費者消費完了,生產者計數也為0了,故才會結束,通過q.join()
- 為了讓作業完成,程序能關閉,等生產者結束,主進程也跟著關閉即可,即生成者.join()一下即可
- 那么就可以把消費者設置成守護進程,主進程結束,守護進程也就結束了,這樣就ok了。
- ==》消費者消費完畢
- ==》生產者結束
- ==》主進程代碼結束,從而守護進程進程結束
- ==》消費者結束
- 注:q.join() 只是保證生產者結束即可,那么在子進程join和主進程中join就一樣了。
-
注:棧 ==> 先進后出----算法
-
管道:multiprocessing.Pipe
- 定義:IPC通信的一種機制,隊列就是基于管道來完成的通信的,但是管道是原生的通信方式,在進程之間會產生數據不安全的情況,需要自己手動加鎖來處理,管道在數據傳輸過程中,還涉及到一個端口管理,這個需要我們在代碼中做處理才能使代碼更完善。
- 管道使用中待解決的問題:
- 1、多進程通信數據不安全
-
答:不安全是因多個進程可能會在同一刻同時去取同一個數據,也可能同一刻拿走一個數據,位置就空了,別的應該放在空位置上,
? ? ? ? 而因同一刻時,另一個進程會認為該位置上有數據,就放在后面了,這就會導致數據異常,解決方法就是在拿數據前加鎖,拿完歸還鎖。 - 2、端口問題,生產者或消費者沒有使用的端口需要關閉,不然,消費者就會認為還有數據要接收,就會一直recv中,因此就需要生成者關閉數據的輸出端,對應的消費者就需要關閉數據的輸入端,才能保證當消費者recv不到數據了通過報錯EOFerror而停止。
- 1 from multiprocessing import Pipe, Process 2 3 4 # 管道 5 # 隊列是基于管道實現的 6 # 隊列 進程間數據安全的 7 # 管道 進程間數據不安全的 8 # 隊列 = 管道 + 鎖 9 10 # left, right = Pipe() 11 # print(right.recv()) 12 # (<multiprocessing.connection.PipeConnection object at 0x000002817A7FB128>, <multiprocessing.connection.PipeConnection object at 0x000002817A65C128>) 13 # 管道對象返回的是一個元組 14 15 16 # 所有的IPC通信都是通過socket實現的 17 18 # 左邊放,右邊出,同樣可以左收,右發,全雙工模式 19 20 # 管道必須在創建進程前創建 21 def consumer(left, right): 22 left.close() # 消費者用右邊接那么就把左邊關閉 23 while 1: 24 try: 25 print(right.recv()) 26 except EOFError: # 再也接不到數據了,從而報錯,才能退出 27 break 28 29 30 if __name__ == '__main__': 31 left, right = Pipe() 32 p = Process(target=consumer, args=(left, right)) 33 p.start() 34 right.close() # 用右邊發送,那么左邊就關閉 35 for i in range(1, 11): 36 left.send(3333) 37 left.close() # 不用了就關閉 38 39 # EOF異常的觸發 40 # 在這一個進程中 如果不在用這個端點了,應該close 41 # 這一在recv的時候,如果其他端點都被關閉了,就能夠知道不會在有新的消息傳進來 42 # 此時就不會在這里阻塞等待,而是拋出一個EOFError 43 # * close并不是關閉了整個管道,而是修改了操作系統對管道端點的引用計數的處理 代碼演練-相關問題注意
-
進程間數據共享
- 進程之間數據共享
消息傳遞的并發是趨勢
線程是通過線程集合,用消息隊列來交換數據
進程間應盡量避免通信,因為可能不安全,想安全就必須加鎖,加鎖就會影響效率。
redis分布式、數據庫解決進程之間數據共享問題
'''
進程間數據是獨立的,可以借助于隊列或管道實現通信,二者都是基于消息傳遞的
雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止于此
'''
# Manager模塊
# 所有的數據類型 都能夠進行數據共享
# 一部分都是不加鎖 不支持數據進程安全
# 不安全的解決辦法 加鎖 - 1 from multiprocessing import Manager,Process,Lock 2 def work(d,lock): 3 with lock: 4 d['count'] -= 1 5 6 if __name__ == '__main__': 7 lock = Lock() 8 m = Manager() 9 dic = m.dict({'count':100}) 10 p_l = [] 11 for i in range(100): # 開了100進程 12 p = Process(target=work, args=(dic, lock)) 13 p_l.append(p) 14 p.start() 15 for p in p_l: 16 p.join() 17 print(dic) 18 ''' 19 結果:{'count':0} 20 ''' 21 22 # with as 的機制 23 # __enter__ 24 # __exit__ 代碼演練
-
進程池:multiprocessing.Pool
- 概念:
- 幾個CPU就能同時運行幾個進程,并行, 進程的個數不是無限開啟的 會給操作系統調度增加負擔【開啟進程慢】
- 且真正能被同時執行的進程最多也就和CPU個數相同等
- 進程的開啟和銷毀都要消耗資源和時間
- 進程池中的進程不會重復開啟和關閉,而是一直在那被使用,減少時間及操作系統調度的開銷
- 進程池中的進程數保持一致,保證同一時間最多有固定數量的進程再運行,減少操作系統調度難度的同時實現并發
- 如果任務數大于池子中進程數,就只能等著,有點像信號量,但信號量進程是新建的,而進程池中的進程一直在
- 使用場景:
- 面向高計算型的場景,沒有或比較少IO型的程序 采用多進程
- 希望并行 最充分的使用CPU
- 對象方法:
- p = Pool(num)? ? 創建進程池,指定池中進程數
- p.apply(func,args=(i,) )? 同步,會等func返回值后,阻塞,下一個任務才能繼續
- p.apply_async(func,args=(j,)) 異步
- p.close()? 不是關閉進程池中的進程讓進程不工作了,而是關閉進程池,讓任務不再提交,已經接到的任務繼續執行直至執行結束
- p.join()? ?等待進程任務中的所有任務都執行完畢
- 創建一個進程池
- 1 import os 2 import time 3 from multiprocessing import Pool 4 print(os.cpu_count()) # 獲取cpu個數 5 6 7 def wahaha(): 8 time.sleep(1) 9 print(os.getpid()) 10 return True 11 12 13 if __name__ == '__main__': 14 p = Pool(5) # 進程池中進程數一般為cpu個數或者cpu+1,不要超過10個 15 for i in range(20): 16 # p.apply(func=wahaha) # 同步,一般不用,還不如一個進程去循環做,進程池有返回值,基于ipc通信,自己可以通過q來通信 17 p.apply_async(func=wahaha) # async 異步的 18 p.close() # 關閉進程池,進程池中的進程不工作了,讓任務不能再繼續提交了, 19 p.join() # 等待這個池中提交的任務都執行完,就結束 代碼演練
-
異步提交,不獲取返回值
- def wahaha():time.sleep(1)print(os.getpid())if __name__ == '__main__':p = Pool(5) # CPU的個數 或者 +1ret_l = []for i in range(20):ret = p.apply_async(func = wahaha) # async 異步的 ret_l.append(ret)p.close() # 關閉 并不是進程池中的進程不工作了# 而是關閉了進程池,讓任務不能再繼續提交了p.join() # 等待這個池中提交的任務都執行完# # 表示等待所有子進程中的代碼都執行完 主進程才結束 代碼演練
-
異步提交,獲得返回值,等待所有任務都執行完畢之后再統一獲取結果
- 1 # 異步提交,獲取返回值,等待所有任務都執行完畢之后再統一獲取結果 2 def wahaha(): 3 time.sleep(1) 4 print(os.getpid()) 5 return True 6 7 if __name__ == '__main__': 8 p = Pool(5) # CPU的個數 或者 +1 9 ret_l = [] 10 for i in range(20): 11 ret = p.apply_async(func = wahaha) # async 異步的 12 ret_l.append(ret) 13 p.close() # 關閉 不是進程池中的進程不工作了 14 # 而是關閉了進程池,讓任務不能再繼續提交了 15 p.join() # 等待這個池中提交的任務都執行完 16 for ret in ret_l: 17 print(ret.get()) 代碼演練
-
異步提交,獲得返回值,一個任務執行完畢之后就可以獲取到一個結果(順序是按照提交任務的順序)【用在任務關聯不大的時候】
- 1 def wahaha(): 2 time.sleep(1) 3 print(os.getpid()) 4 return True 5 6 if __name__ == '__main__': 7 p = Pool(5) # CPU的個數 或者 +1 8 ret_l = [] 9 for i in range(20): 10 ret = p.apply_async(func = wahaha) # async 異步的 11 ret_l.append(ret) 12 for ret in ret_l: 13 print(ret.get()) 代碼演練,不用p.close()和p.join(),ret.get()會阻塞主進程按順序一個個獲取結果
-
總結
- 1 # 異步的 apply_async 2 # 1.如果是異步的提交任務,那么任務提交之后進程池和主進程也異步了, 3 #主進程不會自動等待進程池中的任務執行完畢 4 # 2.如果需要主進程等待,需要p.join 5 # 但是join的行為是依賴close 6 # 3.如果這個函數是有返回值的 7 # 也可以通過ret.get()來獲取返回值 8 # 但是如果一邊提交一遍獲取返回值會讓程序變成同步的 9 # 所以要想保留異步的效果,應該講返回對象保存在列表里,所有任務提交完成之后再來取結果 10 # 這種方式也可以去掉join,來完成主進程的阻塞等待池中的任務執行完畢 總結
-
進程池解決原生socket,同一時刻只能和一個客戶端連接【這種方式的弊端是同時最多只能和進程池中的數量相同,其它用戶等待】
- 1 import socket 2 from multiprocessing import Pool 3 4 5 def talk(conn): 6 try: 7 while 1: 8 msg = conn.recv(1024).decode('utf-8') 9 print(msg) 10 conn.send(b'hello') 11 finally: 12 conn.close() 13 14 15 if __name__ == '__main__': 16 sk = socket.socket() 17 sk.bind(('127.0.0.1', 9999)) 18 sk.listen() 19 pool = Pool(5) 20 try: 21 while 1: 22 conn, addr = sk.accept() 23 pool.apply_async(talk, args=(conn,)) 24 finally: 25 conn.close() 26 sk.close() server
- 1 import socket 2 3 ip_port = ('127.0.0.1', 9999) 4 sk = socket.socket() 5 sk.connect(ip_port) 6 7 while 1: 8 msg = input('>>>>:').strip() 9 if len(msg) == 0: continue 10 sk.send(msg.encode('utf-8')) 11 content = sk.recv(1024).decode('utf-8') 12 print(content) client
-
Pool中的回調函數callback
- 定義:將一個進程的的執行結果的返回值,會當callback參數來執行callback函數,從而減少了ret.get()等I/O操作浪費的時間了。
- 作用:進程池中任何一個任務一旦處理完了,就立即告知主進程,我已執行完畢,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數就是回掉函數。
- 重點:回調函數是主進程中執行【如果有兩個任務,我的第二個任務在第一個任務執行完畢之后能夠立即被主進程執行】
- 1 import os 2 import time 3 import random 4 from multiprocessing import Pool 5 6 # 異步提交,獲取返回值,從頭到尾一個任務執行完畢之后就可以獲取到一個結果 7 def wahaha(num): 8 time.sleep(random.random()) 9 print('pid : ',os.getpid(),num) 10 return num 11 12 def back(arg): 13 print('call_back : ',os.getpid(),arg) 14 15 if __name__ == '__main__': 16 print('主進程',os.getpid()) 17 p = Pool(5) # CPU的個數 或者 +1 18 for i in range(20): 19 ret = p.apply_async(func = wahaha,args=(i,),callback=back) # async 異步的 20 p.close() 21 p.join() 22 23 # 回調函數 _ 在主進程中執行 24 # 在發起任務的時候 指定callback參數 25 # 在每個進程執行完apply_async任務之后,返回值會直接作為參數傳遞給callback的函數,執行callback函數中的代碼 26 27 28 # 北京 30min 30min +5min 5min + 5min 29 # 建設 20min 直接辦 + 5min 5min 30 # 中國 1h 20min +5 25min + 5min 31 # 農業 2h 55min + 5min 55min + 5min 32 # 工商 15min 5min 15min + 5min 33 34 # 2h10min 35 # 2h05min 代碼演練
三、python中的并發編程-線程
- 1 程序:是指令的集合,它是進程運行的靜態描述文本 2 進程:是程序的一次執行活動,是動態概念,計算機中最小資源分配單位 3 線程:cpu調度的最小單位,從屬于進程,任務的實際執行者 程序、進程、線程
- 進程:計算機中最小的資源分配單位,在利用多個cpu執行的過程中,對多個程序的資源進行管理和隔離
- 優點:實現并發,多進程異步執行多個任務,內存地址隔離,IPC通信通過Queue,Pipe
- 缺陷:創立、撤銷、切換都會有較大的時空開銷,過多的進程會造成操作系統調度的壓力,同一時刻,只能運行一個任務
- 線程:cpu 調度的最小單位
- 特點:1、輕型實體(包含程序、數據、tcb,基本不占資源)
- ? ? ? ? ? ?2、獨立調度和分派的基本單位(線程與線程的切換非常快,開銷小)
- ? ? ? ? ? ?3、共享進程資源,同一進程內線程間可相互通信
- ? ? ? ? ? ?4、并發執行
- 進程與線程區別:
- ? 1、線程屬于進程、進程負責獲取操作兄臺那個分配的資源、線程負責任務執行
- ? 2、每個進程中至少有一個線程
- ? 3、地址空間和其他資源,多進程之間內存相互隔離,多線程間共享同一進程數據
- ? 4、通信,進程間不能自由通信,通信需借助管道,隊列等工具,線程可以直接讀寫,自由通信,共用主進程Id
- ? 5、調度和切換,多進程開啟、結束時間開銷大、切換效率低,多線程開銷小、切換效率高
- ? 6、多線程操作系統中,進程不是一個可執行的實體
- cpython 解釋器下有全局解釋器鎖
- 在同一個進程中的多個線程在同一時刻只能有一個線程訪問cpu,導致多線程無法并行處例任務,利用多核處理器
- 注:程序計算的時候才會用到cpu,sleep或I/O阻塞的時候是不會用到CPU的
- jpython/pypy解釋器沒有全局解釋器鎖
-
python線程管理:threading模塊
- 創建線程:共享主進程資源,故每個線程的進程ID一樣,但各個線程有自己的id通過get_ident(),需導入類get_ident
- 1 # 通過類Thread 2 import os 3 import time 4 from threading import Thread,get_ident 5 def func(name): 6 time.sleep(0.1) 7 print('線程:%s,該線程的進程id為:%s,線程id為:%s' %(name,os.getpid(),get_ident())) # get_ident()為類get_ident中的方法,作用是獲取線程id 8 for i in range(10): 9 t = Thread(target=func, args=(i,)) 10 t.start() 11 12 》》》》結果: 13 線程:3,該線程的進程id為:14140,線程id為:13352 14 線程:0,該線程的進程id為:14140,線程id為:1672 15 線程:1,該線程的進程id為:14140,線程id為:15808 16 線程:2,該線程的進程id為:14140,線程id為:10136 17 線程:7,該線程的進程id為:14140,線程id為:4508 18 線程:6,該線程的進程id為:14140,線程id為:15068 19 線程:4,該線程的進程id為:14140,線程id為:13532 20 線程:5,該線程的進程id為:14140,線程id為:10836 21 線程:9,該線程的進程id為:14140,線程id為:9520 22 線程:8,該線程的進程id為:14140,線程id為:15568 代碼演練-方式一:通過類Thread
- 1 # 自定義類創建多線程 【繼承Thread類,同時重些run方法,如需添加自己的屬性或者說加入參數繼承父類init方法】 2 import os 3 from threading import Thread, get_ident 4 5 6 class Mythread(Thread): 7 def __init__(self, args): 8 super().__init__() 9 self.args = args 10 11 def run(self): 12 """ 13 執行函數 14 :return: 15 """ 16 print(self.args) # 就可以調用自己自定義的屬性了 17 print('in thread 子線程Id:', get_ident(), '進程Id:', os.getpid()) 18 19 20 print("父進程python解釋器:", os.getppid()) 21 print('進程即執行py文件:', os.getpid()) 22 print('主線程:', get_ident()) 23 t_obj = Mythread('china') 24 t_obj.start() 25 ''' 26 結果: 27 父進程python解釋器: 1168 28 進程及執行py文件: 2724 29 主線程: 7468 30 china 31 in thread 子線程Id: 8180 進程Id: 2724 32 ''' 代碼演練-方式二:面向對象自定義類
- 多線程與多進程效率對比:線程開閉切遠遠高于進程
- 1 '''效率:多線程開閉切開銷遠遠小于進程隔了大幾百倍''' 2 3 def func(a): 4 a += 1 5 6 7 if __name__ == '__main__': 8 start = time.time() 9 t_lis = [] 10 for i in range(50): 11 t = Thread(target=func, args=(i,)) 12 t.start() 13 t_lis.append(t) 14 for t in t_lis:t.join() 15 print('主線程') 16 print('時間:%s' % str(time.time() - start)) 17 18 start = time.time() 19 t_lis = [] 20 for i in range(50): 21 t = Process(target=func, args=(i,)) 22 t.start() 23 t_lis.append(t) 24 for t in t_lis: t.join() 25 print('主進程') 26 print('時間:%s' % str(time.time() - start)) 27 28 ''' 29 效率測試: 30 主線程 31 時間:0.008229732513427734 32 主進程 33 時間:5.307320833206177 34 ''' 代碼演練
- 多線程數據共享:能夠改變進程全局變量,不加鎖,數據不安全
- 1 '''線程間數據共享''' 2 from threading import Thread 3 4 n = 100 # 全局變量,存在主進程內存空間中 5 6 7 def func(): 8 global n 9 n -= 1 10 print(n) 11 12 13 if __name__ == '__main__': 14 t_li = [] 15 for i in range(100): 16 t = Thread(target=func) # 始終在同一個內存空間中 17 t.start() 18 t_li.append(t) 19 for t in t_li: 20 t.join() 21 print("線程:", n) 22 ''' 23 結果: 24 n = 0 25 每個線程中的n從99,開始遞減 26 ''' 27 28 p_li = [] 29 for i in range(100): 30 p = Process(target=func,) # 分別創建100個獨立的內存空間,且相互獨立 31 p.start() 32 p_li.append(p) 33 for p in p_li: 34 p.join() 35 print('進程:',n) 36 ''' 37 結果: 38 n = 100 39 每個進程中的n為99 40 ''' 代碼演練
- 守護線程:隨主線程運行結束而結束,同進程不一樣,進程是主進程代碼結束? ?【線程不能手動關閉沒有terminate方法,只能通過守護線程來關閉,或者用模塊】
- 1 '''守護線程等待主線程結束而結束, 2 # 主線程結束,必須等所有非守護線程結束,才能結束 3 # 主線程結束,進程就結束了,進程必須保證所有非守護線程結束才行''' 4 import os 5 import time 6 from threading import Thread 7 8 def f1(): 9 print(True) 10 time.sleep(0.5) 11 print(os.getpid()) 12 13 def f2(): 14 print('in f2 start') 15 time.sleep(3) 16 print('in f2 end') 17 print(os.getpid()) 18 19 t = Thread(target=f1) 20 t.setDaemon(True) 21 t.start() 22 23 t2 = Thread(target=f2) 24 t2.start() 25 print('主線程',os.getpid()) 26 27 ''' 28 True 29 主線程 1440 30 in f2 start 31 1440 32 in f2 end 33 1440 34 ''' 代碼演練
- Thread類方法匯總
- t.is_alive()或者t.isAlive()? ? ? ?# 查看主線程是否存活
- t.getName()? ? ? ? ? ? ? ? ? ? ? ? ? #? 放回線程名如: Thread-1 ?
- t.setName()? ? ? ? ? ? ? ? ? ? ? ? ? #? 設置線程名
- t.start()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?# 創建線程,調用run方法,運行
- t.join()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 主線程等待t線程執行結束后才繼續,阻塞
- threading.currentTread()? ? ?#? 返回當前的線程變量,即對象:? <_MainThread(MainThread, started 4644)>
- threading.enumerate()? ? ? ? # 返回正在運行的線程對象列表,列表中線程對象肯定包括主線程+開啟的子線程
- threading.activeCount()? ? ? # 返回正在運行的線程數,即? ? len(threading.enumerate()?)
- 鎖和GIL: 互斥鎖(也叫同步鎖)、遞歸鎖、全局解釋器鎖
- 在多個進程或線程同時訪問一個數據的時候就會產生數據不安全的現象
- 只要多進程/多線程用到全局變量,就必須加鎖,保證數據安全
- GIL鎖:是同一個進程里的每一個線程同一時間只能有一個線程訪問cpu,鎖的是線程,而不是具體的內存,但并不妨礙多個進程同時訪問數據,這將導致訪問同一數據不安全現象
- 遞歸鎖:python支持同一進程/線程多次請求同一資源,從而使資源可以被多次require,直到一個線程所有的require都被release,其他線程才能獲得資源。
- 互斥鎖:同一時刻,只允許一個進程/線程訪問資源,訪問前加鎖require阻塞,訪問處理完畢后release,其他進程/線程才能訪問,實現同步,保證數據安全。
- 為什么要有GIL鎖?
- 在開發python解釋器的時候可以減少很多細粒度的鎖
- 互斥鎖與遞歸鎖那個好?
- 1、遞歸鎖,能快速解決死鎖問題,快速恢復服務
- 2、但死鎖問題的出現,是程序的設計或邏輯的問題,還應進一步的排除和重構邏輯來保證使用互斥鎖也不會發生死鎖問題
- 什么是死鎖?
- 白話:多把鎖同時應用在多個線程中
- 兩個共享的數據資源,兩個進程或線程各拿到了一個資源,但未同時拿到兩個資源,都不釋放,這時就會產生死鎖現象,還有就是前面require了,后面忘記release了。
- 如何解決死鎖?
- 如果在服務階段 -> 遞歸鎖快速恢復服務 ->排查邏輯 -> 互斥鎖
- 如果在測試階段 -> ->排查邏輯 -> 互斥鎖
- 互斥鎖和遞歸鎖的區別?
- 互斥鎖:就是在一個線程中不能連續多次acquire
- 遞歸鎖:可以在同一線程中acquire任意次,但同時注意相應的release次數要與acquire次數相同
- from multiprocessing/threading import RLock? 導入遞歸鎖
- from multiprocessing/threading import Lock? ? ?導入互斥鎖
- 場景: 多個人圍著桌子吃一盤面,必須拿到叉子和面才能吃 如果一個人拿到了叉子,另一個人拿到了面,就不能吃,就會導致僵直在一起import time from threading import Thread,Lock lock = Lock() noodle_lock = Lock() fork_lock = Lock() def eat1(name):noodle_lock.acquire()print('%s拿到了面' % name)fork_lock.acquire()print('%s拿到了叉子' % name)print('%s在吃面'%name)time.sleep(0.5)fork_lock.release() # 0.01noodle_lock.release() # 0.01def eat2(name):fork_lock.acquire() # 0.01print('%s拿到了叉子' % name) # 0.01 noodle_lock.acquire()print('%s拿到了面' % name)print('%s在吃面'%name)time.sleep(0.5)noodle_lock.release()fork_lock.release()eat_lst = ['alex','wusir','太白','yuan'] for name in eat_lst: # 8個子線程 7個線程 3個線程eat1,4個線程eat2Thread(target=eat1,args=(name,)).start()Thread(target=eat2,args=(name,)).start()結果: china拿到了面 china拿到了叉子 china在吃面 china拿到了叉子 usa拿到了面 --------程序開始卡死 科學家吃面問題-互斥鎖? ?【終極原因是兩把鎖,可能會導致不同的人各拿了一把鎖,從而導致程序卡主,發生死鎖現象】
- 1 # 遞歸鎖解決死鎖問題 2 import time 3 from threading import Thread, RLock 4 5 lock = RLock() 6 7 8 def eat1(name): 9 lock.acquire() 10 print('%s拿到了面' % name) 11 lock.acquire() 12 print('%s拿到了叉子' % name) 13 print('%s在吃面' % name) 14 time.sleep(0.5) 15 lock.release() # 0.01 16 lock.release() # 0.01 17 18 19 def eat2(name): 20 lock.acquire() # 0.01 21 print('%s拿到了叉子' % name) # 0.01 22 lock.acquire() 23 print('%s拿到了面' % name) 24 print('%s在吃面' % name) 25 time.sleep(0.5) 26 lock.release() 27 lock.release() 28 29 30 eat_lst = ['china', 'beijing', 'shanghai', 'shenzhen'] 31 for name in eat_lst: # 8個子線程 7個線程 3個線程eat1,4個線程eat2 32 Thread(target=eat1, args=(name,)).start() 33 Thread(target=eat2, args=(name,)).start() 遞歸鎖解決-同一進程允許多次require
- 1 import time 2 from threading import Thread,Lock 3 lock = Lock() 4 def eat1(name): 5 lock.acquire() 6 print('%s拿到了面' % name) 7 print('%s拿到了叉子' % name) 8 print('%s在吃面'%name) 9 time.sleep(0.5) 10 lock.release() # 0.01 11 12 def eat2(name): 13 lock.acquire() # 0.01 14 print('%s拿到了叉子' % name) # 0.01 15 print('%s拿到了面' % name) 16 print('%s在吃面'%name) 17 time.sleep(0.5) 18 lock.release() 19 20 eat_lst = ['china', 'beijing', 'shanghai', 'shenzhen'] 21 for name in eat_lst: # 8個子線程 7個線程 3個線程eat1,4個線程eat2 22 Thread(target=eat1,args=(name,)).start() 23 Thread(target=eat2,args=(name,)).start() 24 25 26 》》》: 27 china拿到了面 28 china拿到了叉子 29 china在吃面 30 china拿到了叉子 31 china拿到了面 32 china在吃面 33 beijing拿到了面 34 beijing拿到了叉子 35 beijing在吃面 36 beijing拿到了叉子 37 beijing拿到了面 38 beijing在吃面 39 shanghai拿到了面 40 shanghai拿到了叉子 41 shanghai在吃面 42 shanghai拿到了叉子 43 shanghai拿到了面 44 shanghai在吃面 45 shenzhen拿到了面 46 shenzhen拿到了叉子 47 shenzhen在吃面 48 shenzhen拿到了叉子 49 shenzhen拿到了面 50 shenzhen在吃面 互斥鎖解決-更改設計邏輯,同時拿到叉子和面
- 信號量:同進程用法一樣,池的效率高于信號量,無論進程,線程? ? 【鎖 + 計數器】
- 1 # 進程中創建信號量,與開啟進程池效率對比 2 # 池效率高于信號量 3 def ktv1(sem, i): 4 sem.acquire() 5 i += 1 6 sem.release() 7 8 9 def ktv2(i): 10 i += 1 11 12 13 # process 14 if __name__ == '__main__': 15 sem = Semaphore(5) # 同時只執行5個任務 16 start_time = time.time() 17 p_list = [] 18 for i in range(20): # 開啟20個任務 19 p = Process(target=ktv1, args=(sem, i)) # 開啟進程20個 20 p.start() 21 p_list.append(p) 22 for p in p_list: p.join() 23 print('process_semaphore:', time.time() - start_time) 24 25 pool = Pool(5) # 開啟進程池,同一時間執行5個任務 26 start_time1 = time.time() 27 pool_list = [] 28 for i in range(20): # 開啟20個任務 29 ret = pool.apply_async(func=ktv2, args=(i,)) # 異步 30 pool_list.append(ret) 31 pool.close() # 關閉進程池,不再受理任務 32 pool.join() # 等待所有進程池中的任務結束 33 print('process_pool:', time.time() - start_time1) 34 ''' 35 process_semaphore: 2.2986388206481934 36 process_pool: 0.5303816795349121 37 ''' 38 39 ========================================== 40 # thread 類中沒有線程池 41 # 但concurrent.futures中有 42 from threading import Thread,Semaphore,currentThread 43 from concurrent.futures import ThreadPoolExecutor 44 def f(sem,i): 45 sem.acquire() 46 i += 1 47 # print("sem",currentThread().getName()) # 獲取線程名 48 sem.release() 49 50 def f2(i): 51 i += 1 52 # print("pool",currentThread().getName()) 53 54 start = time.time() 55 t_sem = Semaphore(5) # 線程信號量, 同時5個任務 56 t_list = [] 57 for i in range(20): # 20個任務,開啟20個線程 58 t = Thread(target=f, args=(t_sem, i)) 59 t.start() 60 t_list.append(t) 61 for t in t_list:t.join() 62 print("in thread sem:" , time.time() - start) 63 64 start = time.time() 65 t_pool = ThreadPoolExecutor(5) 66 67 for i in range(20): 68 ret = t_pool.submit(f2,i) 69 t_pool.shutdown() 70 end = time.time() 71 print("in thread pool:", end- start) 72 ''' 73 in thread: 0.00498652458190918 74 in thread pool: 0.001001596450805664 75 ''' 代碼演練
- 事件:event,同進程用法一樣
- 1 '''事件:event,一個任務依賴另一個任務的狀態才進行下一步 2 wait 等待事件內部的信號變成True就不阻塞了 3 set 將標志改為True 4 clear 改成False 5 is_set 查看標志是否為True 6 ''' 7 # 數據庫連接 8 import time 9 import random 10 from threading import Event,Thread 11 12 13 def check(e): 14 '''檢測是否能夠連通數據庫,網絡''' 15 print('正在檢測兩臺機器之間的網絡情況……') 16 time.sleep(random.randint(2,5)) 17 e.set() # 改成True,非阻塞 18 19 20 def connet_db(e): 21 print("status:", e.is_set()) 22 e.wait() 23 print("status:", e.is_set()) 24 print('連接數據庫……') 25 print('連接數據庫成功~~~') 26 27 # e = Event() 28 # Thread(target=connet_db, args=(e,)).start() 29 # Thread(target=check, args=(e,)).start() 30 ''' 31 status: False 32 正在檢測兩臺機器之間的網絡情況…… 33 status: True 34 連接數據庫…… 35 連接數據庫成功~~~ 36 ''' 37 38 39 def check(e): 40 '''檢測是否能夠連通數據庫,網絡''' 41 print('正在檢測兩臺機器之間的網絡情況……') 42 time.sleep(random.randint(2,5)) 43 e.set() # 改成True,非阻塞 44 45 def connet_db(e): 46 '''3次連接不上就退出''' 47 n = 0 48 while n < 3: 49 if e.is_set(): 50 break # 退出循環,執行連接庫 51 else: 52 e.wait(timeout=0.5) 53 n += 1 54 if n == 3: 55 raise TimeoutError 56 print('連接數據庫……') 57 print('連接數據庫成功~~~') 58 59 60 e = Event() 61 Thread(target=connet_db, args=(e,)).start() 62 Thread(target=check, args=(e,)).start() 63 64 ''' 65 正在檢測兩臺機器之間的網絡情況…… 66 Exception in thread Thread-1: 67 Traceback (most recent call last): 68 File "D:\install\Python36\lib\threading.py", line 916, in _bootstrap_inner 69 self.run() 70 File "D:\install\Python36\lib\threading.py", line 864, in run 71 self._target(*self._args, **self._kwargs) 72 File "D:/install/project/7、并發編程/11、threading_信號量.py", line 140, in connet_db 73 raise TimeoutError 74 TimeoutError 75 ''' 代碼演練-通過事件來實現檢測網絡的狀態,從而決定是否連接數據庫
- 線程池:threading 中沒有數據池的類實現,數據池是另一個包concurrent.futures中ThreadPoolExecutor實現,這個包同樣可以啟用進程池
- 隊列:數據安全,先進先出,自帶鎖
- 1 # from multiprocessing import Queue,JoinableQueue # 進程IPC隊列 2 from queue import Queue # 線程隊列 先進先出的 3 from queue import LifoQueue # 后進先出的 4 #方法: put get put_nowait get_nowait full empty qsize 5 # 隊列Queue 6 # 先進先出 7 # 自帶鎖 數據安全 8 # 棧 LifoQueue 9 # 后進先出 10 # 自帶鎖 數據安全 11 # lq = LifoQueue(5) 幾種鎖
- 條件:使得線程等待,只有滿足某條件時,才釋放n個線程
- 1 Python提供的Condition對象提供了對復雜線程同步問題的支持。 2 Condition被稱為條件變量,除了提供與Lock類似的acquire和release方法外,還提供了wait和notify方法。 3 線程首先acquire一個條件變量,然后判斷一些條件。 4 如果條件不滿足則wait; 5 如果條件滿足,進行一些處理改變條件后,通過notify方法通知其他線程,其他處于wait狀態的線程接到通知后會重新判斷條件。不斷的重復這一過程,從而解決復雜的同步問題。
- 1 from threading import Condition 2 # acquire 3 # release 4 # wait 阻塞 5 # notify 讓wait解除阻塞的工具 6 # wait還是notify在執行這兩個方法的前后 必須執行acquire和release 7 from threading import Condition,Thread 8 def func(con,i): 9 con.acquire() 10 # 判斷某條件 11 con.wait() 12 print('threading : ',i) 13 con.release() 14 15 con = Condition() 16 for i in range(20): 17 Thread(target=func,args=(con,i)).start() 18 con.acquire() 19 # 幫助wait的子線程處理某個數據直到滿足條件 20 con.notify_all() 21 con.release() 22 while True: 23 num = int(input('num >>>')) 24 con.acquire() 25 con.notify(num) 26 con.release() 代碼演練
- 定時器:阻塞??定時器,指定n秒后執行某個操作
- 1 from threading import Timer 2 3 4 def func(): 5 print('執行我啦') 6 7 8 # interval 時間間隔 9 Timer(0.2, func).start() # 定時器 10 # 創建線程的時候,就規定它多久之后去執行 代碼演練
-
內置包:concurrent.futures? 高度封裝,管理數據池【可建進程/線程池】!!!
- 1 #1 介紹
2 concurrent.futures模塊提供了高度封裝的異步調用接口
3 ThreadPoolExecutor:線程池,提供異步調用
4 ProcessPoolExecutor: 進程池,提供異步調用
5 Both implement the same interface, which is defined by the abstract Executor class.
6
7 #2 基本方法
8 #submit(fn, *args, **kwargs)
9 異步提交任務
10
11 #map(func, *iterables, timeout=None, chunksize=1)
12 取代for循環submit的操作
13
14 #shutdown(wait=True)
15 相當于進程池的pool.close()+pool.join()操作
16 wait=True,等待池內所有任務執行完畢回收完資源后才繼續
17 wait=False,立即返回,并不會等待池內的任務執行完畢
18 但不管wait參數為何值,整個程序都會等到所有任務執行完畢
19 submit和map必須在shutdown之前
20
21 #result(timeout=None)
22 取得結果
23
24 #add_done_callback(fn)
25 回調函數
?
- 總結:
- 1、并發:每個線程共用進程ID(所以共享進程資源),每個子線程擁有自己的pid,通過get_ident()【需導入類get_ident】,因為GIL的存在,多線程無法并行。
- 2、數據共享:多線程之間數據共享,可改變主進程全局變量。
- 3、效率:多線程開閉,切時間開銷遠遠小于多進程
- 4、守護線程:主線程結束,守護線程結束,(主線程結束的條件是所有非守護線程結束)故,守護線程結束,意味著所有線程都結束了
- 5、守護進程:主進程代碼結束,守護進程結束,此時非守護進程并不一定都結束了,但主進程會等所有非守護進程結束才會結束,從而回收資源。
- 6、為什么還要用GIL:GIL為cpython解釋器獨有,歷史遺留,但開發python解釋器時可減少很多細粒度的鎖
- 7、開啟多線程會出現主線程ID(執行整個文件代碼 get_ident()),進程ID,子線程ID
- 8、線程開啟不用放在 if__name__== ‘__main__’ 下,因為共享同一個進程資源,進程開啟需求,因為每個進程會另開新的內存空間,同時將主進程變量都導入執行。
- 9、解決并發三個工具就能解決大部分問題:join()? ?同步控制,用戶獲取結果,鎖? 保證數據安全,池? ?提高效率,解決并發
- 10、進程池中的回調函數在主進程運行,線程池中的回調函數在子線程運行。
-
進程/線程中各種對比*****
- 進程/線程中的信號量和線程/進程池的區別?
- 信號量/進程池都是同一時刻允許固定數量的進程/線程訪問修改數據,實現并發效果。
- 信號量是有多少個任務,就開啟多少個進程/線程,進程/線程池是無論任務多少,都只開啟指定數量的進程。
- 信號量減少了操作系統進程/線程切換的負擔,但增加了進程/線程開啟,關閉的時間開銷,進程/線程池是進程/線程開啟后并不因任務結束而關閉,而是等待新任務,這樣就減少了進/線程開閉的時間開銷,也減小了操作系統的調度壓力。
- 從效率上看,進程池的效率要遠遠高于信號量開啟多進程的效率。
- 注:信號量100個人開100個任務,同時只能干5個任務,進程池100個任務,5個人干,同時也只能干5個任務
- 什么時候用多進程,什么時候用多線程?
- 開多進程/線程都是為了實現高并發
- 當任務是高并發計算型的任務,為了充分利用cpu,使用多進程較好
- 當任務為網絡/文件/數據庫等IO操作時,使用多線程較好
- 當任務為既有高并發計算型又有少量IO操作的,可以啟用多進程和多線程一起較好
- 進程與線程的區別?
- 線程不能獨立存在,必須在一個進程里
- 線程的開閉及切換的時間開銷遠遠小于進程
- 同一個進程間的多個線程數據共享,多進程間異步,數據隔離不共享
- cpython解釋器中全局解釋器鎖CIL的存在,導致同一進程多個線程同一時刻只能一個線程訪問cpu,從而不能充分利用多核處理器,而多進程就沒有這個限制
四、python中的并發編程-協程
-
- 什么是協程?
- 是一種用戶態的輕量級線程,實現單線程下的并發,由用戶程序而非操作系統調度控制,在一個線程內的多個任務之間互相切換。
- python中最簡單的切換以及狀態的保存,使用原生的yield就是協程最基本的體現。【yeild 只有程序之間的切換,沒有重利用任何IO操作的時間】
- 一般情況下都是通過gevent這個模塊來完成協程的工作,它內部使用的協程機制是greenlnet,它能夠幫助我們規避IO操作,最大限度的利用cpu。
- 協程比起線程的區別和優勢?
- 區別:
- 1、線程的切換是由操作系統控制調度的,協程的切換是由用戶級別自行控制調度的。
- 2、線程開閉,切換有時間開銷,協程切換,開閉的開銷更小,可通過gevent實現自動切換從而無時間開銷(因為協程是在同一線程下進行的,任務之間的切換是用戶級的)
- 3、線程是在進程下開啟的,協程是在線程下開啟的
- 優勢:
- 1、單線程內就就可以實現并發的效果,最大限度的理由CPU【優點】,協程程序級別的切換開銷更小,操作系統完全感知不到,因而更加輕量級【優點】。
- 2、協程能在一條線程的基礎上,遇到IO在多個任務之間互相切換,節省了線程開啟的開銷,充分的利用了一條線程來提高cpu的工作效率。
- 3、協程不存在數據不安全的問題(因為協程是遇IO切,同一刻,只有一個任務運行,同步效果)
- 4、程序不會因為協程中某一個任務進入阻塞狀態而使整條線程阻塞。
- 缺點:
- 1、本質是單線程下,無法利用多核
- 2、協程一旦出現阻塞,將會阻塞整條線程
- 特點:
- 1、必須在只有一個單線程里實現并發
- 2、修改共享數據不需要枷鎖
- 3、用戶程序里自己保存多個控制流的上下文棧
- 4、遇到IO就切換,原生協程自能手動切
- 5、gevent模塊select機制可以幫助實現監測IO,使一個協程遇到IO操作自動切換到其他協程
- 6、yield,greenlet都無法實現監測IO,自動切換,遇到阻塞還是在阻塞。
轉載于:https://www.cnblogs.com/sunxiuwen/p/9360946.html
總結
以上是生活随笔為你收集整理的python学习并发编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 图表插件Highcharts的动态化赋值
- 下一篇: [Python设计模式] 第1章 计算器