基础10 多进程、协程(multiprocessing、greenlet、gevent、gevent.monkey、select、selector)...
1.多進程實現方式(類似于多線程)
1 import multiprocessing 2 import time,threading 3 4 def thread_run():#定義一個線程函數 5 print("我是子線程%s" %threading.get_ident()) ?#threading.get_ident()函數獲取當前線程的id 6 def run(name):#定義一個進程函數 7 time.sleep(1) 8 print("hello,我是進程%s" %name) 9 t = threading.Thread(target=thread_run(),)#在進程下運行子線程 10 t.start() 11 if __name__ == '__main__': ?#注意:啟多進程必須要加這個判斷,否則會有意想不到的錯誤 12 for i in range(3): 13 p = multiprocessing.Process(target=run,args=('bob %s' %i,)) 14 p.start() 15 p.join() 16 17 輸出: 18 hello,我是進程bob 0 19 我是子線程28176 20 hello,我是進程bob 1 21 我是子線程27016 22 hello,我是進程bob 2 23 我是子線程275162.進程都是由父進程創建和啟動的
1 from multiprocessing import Process 2 import os 3 4 5 def info(title): 6 print(title) 7 print('module name:', __name__) 8 print('當前進程父進程id:', os.getppid()) 9 print('當前進程 id:', os.getpid()) 10 print("\n\n") 11 12 13 def f(name): 14 info('\033[31;1mcalled from child process function f\033[0m') 15 print('hello', name) 16 17 if __name__ == '__main__': 18 info('\033[32;1mmain process line\033[0m') 19 p = Process(target=f, args=('bob',)) # 20 p.start() 21 22 輸出: 23 main process line 24 module name: __main__ 25 當前進程父進程id: 12468 #這是pycharm的進程號 26 當前進程 id: 25692 #這是第一個info函數的進程號 27 28 29 30 called from child process function f 31 module name: __mp_main__ 32 當前進程父進程id: 25692 #這是第一個info函數的進程號 33 當前進程 id: 27252 #這是f函數內調用的info函數的進程號 34 35 36 37 hello bob3.進程間是不共享內存的(不同于線程),即進程之間是不能直接交換信息的
可以用以下方式實現進程間通信,用Queue隊列(線程中用queue)
4.實現不同進程間通信的另外一種方法
?(1)管道(Pipe()),類似于socket
1 from multiprocessing import Process, Pipe 2 def f(conn): 3 conn.send([42, 'hello'])#類似于socket,這里子進程發了兩次,下邊父進程就需要收兩次 4 conn.send([32, 'yyyy']) 5 print(conn.recv()) #子進程也可以接收父進程發過來的信息 6 conn.close() 7 8 if __name__ == '__main__': 9 parent_conn, child_conn = Pipe() 10 p = Process(target=f, args=(child_conn,)) 11 p.start() 12 print(parent_conn.recv())#收兩次 13 print(parent_conn.recv()) 14 parent_conn.send('我是父進程A') #父進程可以給子進程發信息 15 p.join() 16 17 輸出: 18 [42, 'hello'] 19 [32, 'yyyy'] 20 我是父進程A5.實現進程間內存的共享(所謂共享即所有進程操作同一份列表、字典等等,其實也不真的是同一份數據,只不過是程序內部通過復制原始列表、字典給進程,進程運行以后修改列表、字典,最后合為一個列表、字典,讓人們以為是所有進程共享了相同的內存)
manager模塊可實現這一功能
6.進程鎖,防止在屏幕上或日志里打印亂了
??如這種亂:hello worhello world 8
7.很重要? 進程池????防止進程啟動太多,消耗太多資源,一般是幾個cpu就啟幾個進程
?
1 from multiprocessing import Process, Pool 2 import time 3 import os 4 5 def foo(i): 6 time.sleep(1) 7 print("正在執行子進程", os.getpid()) 8 return os.getpid() #返回的是子進程的id號 9 10 def bar(arg): #定義回調函數,參數arg是foo函數的返回值 11 print("子進程%s執行完畢,主進程%s調用的我"% (arg, os.getpid())) 12 13 if __name__ == '__main__': 14 pool = Pool(5) #允許進程池同時放入5個進程 15 print("主進程", os.getpid()) 16 for i in range(10): 17 pool.apply_async(func=foo, args=(i,), callback=bar)#異步,異步最好,callback后跟回調函數,回調函數是由主進程執行的 18 #回調函數的參數不用寫,默認就是前邊的func 19 # pool.apply(func=foo, args=(i,)) #同步,是串行的,看不出是5個5個的執行 20 print("主進程:end") 21 pool.close() #注意,這里要先close,再join,否則會有錯誤 22 pool.join()輸出:
1 主進程 57784 2 主進程:end 3 正在執行子進程 58060 4 子進程58060執行完畢,主進程57784調用的我 5 正在執行子進程 55428 6 子進程55428執行完畢,主進程57784調用的我 7 正在執行子進程 58356 8 子進程58356執行完畢,主進程57784調用的我 9 正在執行子進程 56716 10 子進程56716執行完畢,主進程57784調用的我 11 正在執行子進程 57380 12 子進程57380執行完畢,主進程57784調用的我13 正在執行子進程 58060 14 子進程58060執行完畢,主進程57784調用的我 15 正在執行子進程 55428 16 子進程55428執行完畢,主進程57784調用的我 17 正在執行子進程 58356 18 子進程58356執行完畢,主進程57784調用的我 19 正在執行子進程 56716 20 子進程56716執行完畢,主進程57784調用的我 21 正在執行子進程 57380 22 子進程57380執行完畢,主進程57784調用的我
?8.協程
又稱微線程,是一種用戶態的輕量級線程,協程擁有自己的寄存器和棧,協程調度切換時,將寄存器上下文和棧保存在其他地方,當切回來的時候,恢復先前保存的寄存器上下文和棧,因此,每一次協程恢復時,
都能在它離開中斷的地方繼續。
協程的好處:
- 無需線程上下文切換的開銷
- 無需原子操作鎖定及同步的開銷
- 方便切換控制流,簡化編程模型
- 高并發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用于高并發處理。
缺點:
- 無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
- 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序
用greenlet模塊實現協程(手動):
1 from greenlet import greenlet 2 def test1(): 3 print("test1") #4.打印 4 gr2.switch() #5.切換到test2,test2開始執行 5 print("test1...") #8.接上次離開的地方,開始執行,打印 6 gr2.switch() #9.切換到test2,test2開始執行 7 8 def test2(): 9 print("test2") #6.打印 10 gr1.switch() #7.切換到tets1,test1開始執行 11 print("test2...") #10.接上次離開的地方,開始執行,打印,程序結束 12 13 gr1 = greenlet(test1) #1.啟動一個協程 14 gr2 = greenlet(test2) #2.啟動一個協程 15 gr1.switch() #3.切換到test1,test1開始執行 16 17 輸出: 18 test1 19 test2 20 test1... 21 test2...重要:用gevent模塊實現協程(自動切換)。遇到IO(程序睡覺或卡主)便自動切換到下一個協程
1 import gevent 2 3 4 def t1(): 5 print("t1 運行") #4.t1開始運行,打印 6 gevent.sleep(2) #5.切換到下一個協程,按生成的順序,切換到t2 7 #10.程序運行到這里,只花費了不到0.1秒鐘,這時t1還在睡覺,又觸發了切換操作,切換到t2 8 #13.程序運行到這里,只花費了不到0.1秒鐘,這時t1還在睡覺,又觸發了切換操作,切換到t2 9 print("再次回到t1") #16.接上次離開的地方,執行打印操作 10 11 12 def t2(): 13 print("t2 運行") #6.t2開始運行,打印 14 gevent.sleep(1) #7.切換到下一個協程,按生成的順序,切換到t3 15 #11.程序運行到這里,只花費了不到0.1秒鐘,這時t2還在睡覺,又觸發了切換操作,切換到t3 16 #14.程序運行到這里,只花費了不到0.1秒鐘,這時t2還在睡覺,又觸發了切換操作,但是t3運行 17 # 完畢,無法切換到t3,只能切換到t1,t1還在睡覺,又切換到t2,互相切換過了1秒后,t2睡覺 18 # 完畢,這時切換到t2 19 print("再次回到t2") #15.接上次離開的地方,執行打印操作,切換到t1 20 21 22 def t3(): 23 print("t3運行") #8.t3開始運行,打印 24 gevent.sleep(0) #9.雖然睡覺0秒,但是會觸發切換操作,切換到下一個協程,按生成的順序,切換到t1 25 print("再次回到t3") #12.由于t3沒有睡覺,執行打印操作。t3運行完畢,自動切換到t1 26 27 gevent.joinall([ 28 gevent.spawn(t1), #1.生成協程1 29 gevent.spawn(t2), #2.生成協程2 30 gevent.spawn(t3), #3.生成協程3 31 ]) 32 33 輸出:t1 運行
t2 運行
t3運行
再次回到t3
再次回到t2
再次回到t1
?9.爬蟲,簡單下載網頁,使用gevent和gevent.monkey模塊(需python3.5以上,gevent在linux完美支持,在Windows可能會有問題)
1 from urllib import request 2 import gevent,time 3 from gevent import monkey 4 monkey.patch_all() #把當前程序的所有的io操作給gevent單獨的做上標記,使gevent能辨識出io操作,從而達到協程遇到io就切換的效果,不加monkey是不能識別出io的 5 6 def f(url): 7 print('GET: %s' % url) 8 resp = request.urlopen(url) 9 data = resp.read() 10 print('%d bytes received from %s.' % (len(data), url)) 11 12 urls = ['https://www.python.org/', 13 'https://www.yahoo.com/', 14 'https://github.com/' ] 15 time_start = time.time() 16 for url in urls: 17 f(url) 18 print("同步cost",time.time() - time_start) 19 async_time_start = time.time() 20 gevent.joinall([ 21 gevent.spawn(f, 'https://www.python.org/'), 22 gevent.spawn(f, 'https://www.yahoo.com/'), 23 gevent.spawn(f, 'https://github.com/'), 24 ]) 25 print("異步cost",time.time() - async_time_start)?輸出:
1 GET: https://www.python.org/ 2 47403 bytes received from https://www.python.org/. 3 GET: https://www.yahoo.com/ 4 460629 bytes received from https://www.yahoo.com/. 5 GET: https://github.com/ 6 25734 bytes received from https://github.com/. 7 同步cost 5.380241632461548 8 GET: https://www.python.org/ 9 GET: https://www.yahoo.com/ 10 GET: https://github.com/ 11 47403 bytes received from https://www.python.org/. 12 430893 bytes received from https://www.yahoo.com/. 13 25734 bytes received from https://github.com/. 14 異步cost 1.2949371337890625?10.基于協程的異步多并發
socket?server
1 #!usr/bin/env/python 2 # -*- coding:utf-8 -*- 3 # from wangteng 4 #通過協程實現并發 5 import sys 6 import socket 7 import time 8 import gevent 9 10 from gevent import socket,monkey 11 monkey.patch_all() 12 13 14 def server(port): 15 s = socket.socket() 16 s.bind(('0.0.0.0',port)) 17 s.listen(500)#最大連接數為500 18 while True: 19 cli, addr = s.accept() 20 gevent.spawn(handle_request, cli)# 創建協程,這里做了一個操作,將cli作為參數傳給handle_request, handle_request(cli) 21 22 23 def handle_request(conn): 24 try: 25 while True: 26 data = conn.recv(1024)#接收client的請求 27 28 with open("bbb.txt",'ab') as f:#接收文件內容,并寫入文件 29 f.write(data) 30 print("recv:", data) 31 # post = bytes('get your request: ',encoding='utf-8') 32 # post = post + data #post為要向client發送的數據,必須為bytes類型 33 conn.send(data) #向client發送數據 34 if not data: 35 conn.shutdown(socket.SHUT_WR) 36 37 except Exception as ex: 38 print(ex) 39 40 finally: 41 conn.close() 42 43 if __name__ == "__main__": 44 server(9000)client
1 #!usr/bin/env/python 2 # -*- coding:utf-8 -*- 3 # from wangteng 4 #本客戶端可以跟異步socket_server多并發、select socket server一起用 5 import socket 6 7 host = 'localhost' 8 port = 9000 9 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 10 s.connect((host, port)) 11 while True: 12 msg = bytes(input(">>:"), encoding="utf-8") #輸入要發送的內容,必須是bytes類型 13 s.sendall(msg) #發送數據 14 # filename = input("filename:>>") #注釋的內容是可以發送文件的 15 # with open(filename,'rb') as f:#發送文件 16 # for line in f: 17 # s.sendall(line) 18 # data = s.recv(1024) 19 # print('recv:',data) 20 data = s.recv(1024)#接收數據 21 print('recv:', data.decode())#解碼可以顯示中文 22 s.close()11.io多路復用,不是異步???基于select
server
client
1 #!usr/bin/env/python 2 # -*- coding:utf-8 -*- 3 # from wangteng 4 #本客戶端可以跟異步socket_server多并發、select socket server一起用 5 import socket 6 7 host = 'localhost' 8 port = 9000 9 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 10 s.connect((host, port)) 11 while True: 12 msg = bytes(input(">>:"), encoding="utf-8") #輸入要發送的內容,必須是bytes類型 13 s.sendall(msg) #發送數據 14 # filename = input("filename:>>") #注釋的內容是可以發送文件的 15 # with open(filename,'rb') as f:#發送文件 16 # for line in f: 17 # s.sendall(line) 18 # data = s.recv(1024) 19 # print('recv:',data) 20 data = s.recv(1024)#接收數據 21 print('recv:', data.decode())#解碼可以顯示中文 22 s.close()12.??重要要會:IO多路復用之? selector
server
?#將會調用accept函數去新建一個連接 31 32 while True: 33 events = sel.select() #默認阻塞,有活動連接就返回活動的連接列表,events是一個列表,里邊存放所有的連接 34 for key, mask in events: #mask是連接的順序數 35 callback = key.data #調用accept函數 36 callback(key.fileobj, mask) #key.fileobj= 文件句柄
client
1 #!usr/bin/env/python 2 # -*- coding:utf-8 -*- 3 # from wangteng 4 #本客戶端可以跟異步socket_server多并發、select socket server一起用 5 import socket 6 7 host = 'localhost' 8 port = 9000 9 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 10 s.connect((host, port)) 11 while True: 12 msg = bytes(input(">>:"), encoding="utf-8") #輸入要發送的內容,必須是bytes類型 13 s.sendall(msg) #發送數據 14 # filename = input("filename:>>") #注釋的內容是可以發送文件的 15 # with open(filename,'rb') as f:#發送文件 16 # for line in f: 17 # s.sendall(line) 18 # data = s.recv(1024) 19 # print('recv:',data) 20 data = s.recv(1024)#接收數據 21 print('recv:', data.decode())#解碼可以顯示中文 22 s.close()?
轉載于:https://www.cnblogs.com/wt11/p/5909366.html
總結
以上是生活随笔為你收集整理的基础10 多进程、协程(multiprocessing、greenlet、gevent、gevent.monkey、select、selector)...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如果是繁體,Zzk搜不搜的到呢?
- 下一篇: 通过ProGet搭建一个内部的Nuget