Python线程与进程 I/O多路复用
SSHClient Paramiko模塊
遠程執行命令
#用戶名密碼方式: import paramikossh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy) ssh.connect(hostname='192.168.18.204',port=22,username='root',password='123456')stdin,stdout,stderr=ssh.exec_command('df -h && ip a')result=stdout.read() print(result.decode()) ssh.close()#密鑰方式: import paramikoprivate_key = paramiko.RSAKey.from_private_key_file('id_rsa') ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy) ssh.connect(hostname='192.168.18.204',port=22,username='root',pkey=private_key)stdin,stdout,stderr=ssh.exec_command('df -h && ip a')result=stdout.read() print(result.decode())復制文件到服務端
#用戶名密碼方式: import paramikotransport = paramiko.Transport(('192.168.18.204',22)) #必須為元組,否則端口不為22時(不輸入時默認為22),會一直連接不上. transport.connect(username='root',password='123456')sftp=paramiko.SFTPClient.from_transport(transport) sftp.put('./1.txt','/root/2.txt') #上傳文件 sftp.get('/root/2.txt','./3.txt') #下載文件 transport.close()#密鑰方式: import paramikoprivate_key = paramiko.RSAKey.from_private_key_file('id_rsa') transport=paramiko.Transport(('192.168.18.204',22)) transport.connect(username='root',pkey=private_key)sftp=paramiko.SFTPClient.from_transport(transport) sftp.get('/root/2.txt','./4.txt') sftp.put('3.txt','/root/4.txt') transport.close()進程與線程的區別
進程與線程的區別: 進程: 一個程序執行的實例,就是各種資源(內存頁,文件描述符,Open Socket )的集合。 線程:是操作系統的最小的調度單位,是一串指令的集合,創建進程時會自動創建一個線程。 線程共享內存空間,可以互相訪問。進程的內存空間是獨立的,不能互相訪問。 子進程相當于克隆一遍父進程。 進程要操作CPU,必須要創建一個線程,線程本身無法操作CPU。 進程快還是線程快,沒有可比性,一個是資源的集合,另一個是執行任務的。 線程之間的可以直接交流,兩個進程想通信,必須通過一個中間代理。 線程是獨立的,不會像進程那樣,殺了父進程子進程也會死掉。 主線程和其他線程是并行的。簡單的多線程示例
普通的調用方式
import time import threading def run(n):print('task',n)time.sleep(2) t1=threading.Thread(target=run,args=('t1',)) t2=threading.Thread(target=run,args=('t2',)) t1.start() t2.start()繼承類的調用方式,并且計算所有線程的總耗時
import time import threading class MyThread(threading.Thread):def __init__(self,n):super().__init__()self.n=ndef run(self): #無法接收參數print('task',self.n)time.sleep(2)res=[] start_time=time.time() for i in range(50):t=MyThread('t-%s' %i)res.append(t)t.start()for i in res:i.join() #join會堵塞,等待線程結束。print('總耗時: {}'.format(time.time()-start_time))遞歸鎖
threading.RLock() 多重鎖,在同一線程中可用被多次acquire。如果使用RLock,那么acquire和release必須成對出現def run1():print("grab the first part data\n")lock.acquire() #如果使用的是threading.Lock(),此處就會卡住,因為一次只能一個鎖定,其余鎖請求,需等待鎖釋放后才能獲取。global numnum += 1lock.release()return numdef run3():lock.acquire()res = run1()lock.release()print(res, )if __name__ == '__main__':num= 0lock = threading.RLock()for i in range(1):t = threading.Thread(target=run3)t.start()while threading.active_count() != 1: #線程數不等于1就繼續等待print(threading.active_count()) else:print('----all threads done---')print(num)信號量
信號量,指定允許幾個線程同時運行,但不是等待幾個線程都結束了,才允許下一批線程允許,而是結束一個放進來一個。用于連接池一類。import threading, time def run(n):semaphore.acquire()time.sleep(1)print('run the thread: %s\n' %n)semaphore.release()if __name__ == '__main__':semaphore = threading.BoundedSemaphore(3) #同一時間只允許3個線程同時存在for i in range(20):t = threading.Thread(target=run,args=(i,))t.start() while threading.active_count() != 1:pass else:print('----all threads done---')注:python多線程 不適合cpu密集型,適合io密集型任務,因為python的線程不支持使用多核,但是io不占用cpu,所以適合io密集型。
Python多進程,適合cpu密集型,因為進程可以使用多核。
Event,線程之間的交互。
import threading import timeevent=threading.Event() def Traffic_lights():event.set()count=0while True:if count >=5 and count <10:event.clear()elif count >=10:event.set()count = 0count += 1time.sleep(1)def car():while True:if event.is_set():print('\033[36;1m變綠燈了\033[0m\n')print('\033[36;1m寶馬車開始運行\033[0m\n')time.sleep(1)else:print('\033[31;1m變紅燈了\033[0m\n')print('\033[31;1m寶馬車停止運行\033[0m\n')event.wait()t1=threading.Thread(target=Traffic_lights) t1.start() c1=threading.Thread(target=car) c1.start()Queue隊列
優點:解耦,提高效率
列表與隊列的區別:列表取出一個數據,數據還存在在列表中,隊列取出數據后則會刪除隊列中的數據。
生產者消費者模型
import threading import time import queue q=queue.Queue(maxsize=10) def producer(name):i=1while True:q.put('汽車 {}'.format(i))print('生產了汽車%s' %i)i+=1def consumer(name):while True:print('{} 開走了{}'.format(name,q.get()))time.sleep(1)p1=threading.Thread(target=producer,args=('zyl',)) c1=threading.Thread(target=consumer,args=('wq',)) c2=threading.Thread(target=consumer,args=('syf',)) p1.start() c1.start() c2.start()pipe管道(類似與Queue)
from multiprocessing import Process,Pipe def run(conn):conn.send([1,2])print(conn.recv())conn.close()if __name__ == '__main__':parent_conn,child_conn=Pipe() #生成兩個連接,將子連接傳給子進程。P=Process(target=run,args=(child_conn,))P.start()print(parent_conn.recv())parent_conn.send([4,5])P.join()Manager
進程之間的數據共享。管道和Queue只是傳遞。
import os from multiprocessing import Process,Managerdef run(d,l):d['b'] = '2'l.append(os.getpid())if __name__ == '__main__':with Manager() as manager:d = manager.dict()l = manager.list(range(5)) #初始五個數字p_list=[]for i in range(10):P=Process(target=run,args=(d,l))P.start()p_list.append(P)for res in p_list:res.join() #等待所有進程執行完畢print(d)print(l)進程鎖
from multiprocessing import Process,Lockdef run(l,i):l.acquire()try:print('hellow word',i)finally:l.release()if __name__ == '__main__':lock=Lock()for num in range(10):Process(target=run,args=(lock,num)).start()進程池
from multiprocessing import Process,Pool import time import osdef run(i):time.sleep(2)print('in process',os.getpid())return i+100def Bar(arg):print('==>exec done:',arg)if __name__ == '__main__': #windows上必須寫這句話pool=Pool(processes=5)for num in range(15):pool.apply_async(func=run,args=(num,),callback=Bar) #callback,回調函數,進程執行完畢后,由主進程執行這個函數。print('全部開啟')pool.close() #必須要先close在關閉。pool.join()協程
#gevent,遇到io自動切換。 #gevent默認不知道urllib和socket會進行io操作。解決方法: from gevent import monkey #對所有進行i/o的操作打上一個標記 monkey.patch_all()示例(一),下載網頁: from urllib import request import gevent,time from gevent import monkeymonkey.patch_all()def f(url):print('下載網頁:',url)headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0'} ##防止403錯誤req=request.Request(url=url,headers=headers)data=request.urlopen(req).read()print('%d bytes received from %s.' %(len(data),url))urls=['https://pythonwheels.com/','https://www.yahoo.com/','https://github.com' ] start=time.time() for url in urls:f(url) print('同步cost',time.time()-start) async_start=time.time() gevent.joinall([gevent.spawn(f,'https://pythonwheels.com/'),gevent.spawn(f,'https://www.yahoo.com/'),gevent.spawn(f,'https://github.com') ]) print('異步cost:',time.time()-async_start)示例(二),通過gevent實現一個Socket: import sys import socket import time import geventfrom gevent import socket, monkeymonkey.patch_all()def server(port):s = socket.socket()s.bind(('0.0.0.0', port))s.listen(500)while True:cli, addr = s.accept()gevent.spawn(handle_request, cli)def handle_request(conn):try:while True:data = conn.recv(1024)print("recv:", data)conn.send(data)if not data:conn.shutdown(socket.SHUT_WR)except Exception as ex:print(ex)finally:conn.close()if __name__ == '__main__':server(8001)事件驅動和異步IO
1.事件驅動模型就是根據一個事件來做反應,類似于生產者消費者模型。gevent就是使用了事件驅動模型,遇到I/O時注冊一個事件,然后系統執行I/O,在I/O操作完畢后回調一個事件告訴gevent,它之前注冊的事件執行完畢了。 2.緩存I/O,數據會先被拷貝到操作系統的內核緩沖區(內存),然后才會從操作系統內核的緩沖區拷貝到應用程序的地址空間(內存)。內核態就是內核空間到用戶空間。缺點是:“數據在傳輸過程中要在應用程序地址空間和內核進行多次數據拷貝”,為什么這么做因為用戶空間無法操作系統,只能調用操作系統的接口,來完成這次操作。 3.堵塞I/O的意思是,客戶端繼承使用recvfrom調用kernel,來查看是否有數據,有的話返回數據,沒有的話就會一直等待它有數據。 4.非堵塞I/O的意思是,客戶端繼承使用recvfrom調用kernel,來查看是否有數據,有的話返回數據,沒有的話就會返回一個錯誤(error),然后客戶端再次調用kernel來查看是否有數據,有的話返回數據,沒有的話就會返回一個錯誤(error),陷入循環。所以,nonblocking IO的特點是用戶進程需要不斷的主動詢問kernel數據好了沒有。 5.在單個線程中,如果使用的是堵塞I/O是沒法實現多路I/O。 6.在單個線程中,如果使用的是非堵塞I/O,是可以實現多路I/O的。單線程下如果有100個連接,使用的是非堵塞模式的話,說不準那個數據先到,所以就循環收取。某個連接沒有數據是他會返回一個(error),不會等待,但是還是會在從內核態復制數據到用戶態時間卡住。 7.I/O多路復用的特點是一個線程可以同時等待多個文件描述符(socket),其中任意一個進入就緒狀態,select()函數就可以返回。返回時并不會告訴進程是哪一個連接有數據了,可以通過select,pool,epool來查看。 8.異步I/O,用戶進程多個連接發起read之后,立刻就可以干別的事情。Kernel來幫你等待數據,然后將數據拷貝到用戶內存。返回時并不會告訴進程是哪一個連接有數據了。select poll epoll的區別,全部應用在I/O多路復用狀態。 1.select單進程打開的默認可以打開的文件數量是1024,調用select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。 2.poll和select的區別不大,取消了最大打開文件數量。 3.epool只有linux支持,與select的區別是在異步I/O中如果有一個連接活躍了,kernel會告訴進程是哪一個連接活躍了,沒有最大連接限制。`水平觸發`就是數據在內核態已經準備完畢了,但是進程沒有調用read去取。數據會一直保留在內核態。下次再有數據,會再次告訴進程數據準備完畢了。`邊緣觸發`就是數據在內核態已經準備完畢了,但是進程沒有調用read去取。數據會一直保留在內核態。下次再有數據,不會再次告訴進程數據準備完畢了。 4.nginx其實是I/O多路復用。 Python3里的模塊asyncio支持異步i/o.select
Python的select()方法直接調用操作系統的IO接口,它監控sockets,open files, and pipes(所有帶fileno()方法的文件句柄)何時變成readable 和writeable, 或者通信錯誤,select()使得同時監控多個連接變的簡單,并且這比寫一個長循環來等待和監控多客戶端連接要高效,因為select直接通過操作系統提供的C的網絡接口進行操作,而不是通過Python的解釋器。
示例:使用select(I/O多路復用)實現socketServer。 import select #底層做了封裝,可以直接返回活動的連接 import socket import sys import queueserver=socket.socket() server.setblocking(0) #設置為非堵塞 server.bind(('localhost',9999)) server.listen(10) inputs=[server,] outputs=[] message_queues={}while True:readable,writeable,exeptional=select.select(inputs,outputs,inputs) #select()方法接收并監控3個通信列表, 第一個是要監控哪些連接,剛開始時監控自身,第2個是監控和接收所有要返回給客戶端的data(outgoing data),第3個監控那些連接的錯誤信息,#readable 返回活動的連接for s in readable:if s is server: #如果是server的話代表有新連接進來了conn,client_addr=s.accept()inputs.append(conn) #將新連接添加到監控列表message_queues[conn]=queue.Queue()else: #s不是server的話,那就只能是一個 與客戶端建立的連接的fd了,客戶端的數據過來了,在這接收。data=s.recv(1024)if data:print("收到來自[%s]的數據:"%s.getpeername()[0],data)message_queues[s].put(data) #將用戶數據存儲到Queue中if s not in outputs:outputs.append(s)else:print('客戶端斷開了: ',s)if s in outputs:outputs.remove(s)inputs.remove(s)#writeable 存儲要返回給用戶數據的連接for s in writeable:try:msg=message_queues[s].get_nowait()except queue.Empty:outputs.remove(s)else:s.send(msg)#exeptional 存儲出現錯誤的連接for s in exeptional:print("handling exception for ",s.getpeername())inputs.remove(s)if s in outputs:outputs.remove(s)s.close()del message_queues[s]selectors模塊
此模塊根據系統的不同,會使用不同的方式,Linux優先使用epoll
import selectors import socketsel=selectors.DefaultSelector() def accept(sock,mask):conn,addr=sock.accept()print('accepted', conn, 'from', addr)conn.setblocking(False)sel.register(conn, selectors.EVENT_READ, read) ##新連接注冊read回調函數def read(conn, mask):data = conn.recv(1000) # Should be readyif data:print('echoing', repr(data), 'to', conn)conn.send(data.upper()) # Hope it won't blockelse:print('closing', conn)sel.unregister(conn) #移除注冊的連接conn.close()sock = socket.socket() sock.bind(('localhost', 10000)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True:events = sel.select() #默認阻塞,有活動連接就返回活動的連接列表for key, mask in events:callback = key.data #callback相當于accept函數callback(key.fileobj, mask) #key.fileobj是客戶端socket每日練習
簡單主機批量管理工具
(1). 主機分組
(2). 主機信息配置文件用configparser解析
(3). 可批量執行命令、發送文件,結果實時返回,執行格式如下
batch_run -h h1,h2,h3 -g web_clusters,db_servers -cmd "df -h"
batch_scp -h h1,h2,h3 -g web_clusters,db_servers -action put -local test.py -remote /tmp/
(4). 主機用戶名密碼、端口可以不同
(5). 執行遠程命令使用paramiko模塊
(6). 批量命令需使用multiprocessing并發
配置文件
運行Py
import time import shlex import configparser import threading import paramiko import osinstructions_dict={'batch_run':['-h','-g','-cmd','help'],'batch_scp':['-h','-g','-action','-local','-remote','help'], }class HOST_MANAGE(object):def __init__(self):print('\033[36;1m歡迎使用批量管理主機系統\033[0m'.center(50, '*'))print('\033[36;1m下面是該系統的使用方法;\n(1)根據序號顯示群組下的主機信息\\n(2)命令名稱 help 獲取命令幫助;\033[0m')print('\033[36;1m群組信息\033[0m'.center(50, '='))def batch_run_help(self):'''顯示batch_run命令幫助信息:return:'''print('\033[36;1mbatch_run 遠程執行命令;\\n-h:指定單獨的主機序號,使用逗號分割;\\n-g:指定群組命令會發送到該群組下所有機器,使用逗號分割;\\n-cmd:指定命令,使用引號將命令包裹;\033[0m')def batch_scp_help(self):'''顯示batch_scp命令幫助信息:return:'''print('\033[36;1mbatch_scp 文件的上傳與下載;\\n-h:指定單獨的主機,使用逗號分割;\\n-g:指定群組命令會發送到該群組下所有機器,使用逗號分割;\\n-action:指定動作,put:復制本地文件到遠程機器,get;將遠程主機文件復制到本地;\\n-local:本地路徑\\n-remote:遠程路徑\033[0m')def batch_run(self,parameter,GroupInfo,*args,**kwargs):'''此方法用于解析參數,執行命令:param parameter: 命令參數:param GroupInfo: 主機信息:param args::param kwargs::return:'''host_info=set()args=[ i for i in parameter.keys()]if 'help' in args:self.batch_run_help()return Trueelif '-h' not in args:print('缺少關鍵參數: -h')return Falseelif '-cmd' not in args:print('缺少關鍵參數: -cmd')return Falsefor i in parameter['-h'].split(','):if not i.isdigit() or int(i)-1 not in range(len(parameter)):print('-h 參數錯誤,沒有序號為%s的主機' %i)return Falseelse:i=int(i)-1host_list_info=eval(GroupInfo[i][1])host_info.add('{}:{}:{}'.format(host_list_info[0],host_list_info[1],host_list_info[2]))res=[]return_info=[]for i in host_info:ip, port, user, passwd, = [ i for i in i.split(':') ]t=threading.Thread(target=self.run_shell,args=(ip,port,user,passwd,parameter['-cmd'],return_info))res.append(t)t.start()for j in res:j.join()for k,v in return_info:print('{}'.format(k).center(50,'='))print(v.decode())return Truedef batch_scp(self,parameter,GroupInfo,*args,**kwargs):'''此方法,用于解析參數,生成線程執行復制文件的操作:param parameter::param GroupInfo::param args::param kwargs::return:'''host_info=set()args=[ i for i in parameter.keys()]if 'help' in args:self.batch_scp_help()return Trueelif '-h' not in args:print('缺少關鍵參數: -h')return Falseelif '-action' not in args:print('缺少關鍵參數: -action')return Falseelif '-local' not in args:print('缺少關鍵參數: -local')return Falseelif '-remote' not in args:print('缺少關鍵參數: -remote')return Falsefor i in parameter['-h'].split(','):if not i.isdigit() or int(i)-1 not in range(len(parameter)):print('-h 參數錯誤,沒有序號為%s的主機' %i)return Falseelse:i=int(i)-1host_list_info=eval(GroupInfo[i][1])host_info.add('{}:{}:{}:{}:{}:{}'.format(host_list_info[0],host_list_info[1],host_list_info[2],parameter['-local'],parameter['-remote'],parameter['-action']))print(host_info)res=[]for i in host_info:ip,port,user,passwd,local,remote,action,=[ i for i in i.split(':') ]if action == 'put':if os.path.isfile(local):local=os.path.abspath(local)else:print('本地沒有此文件: ',local)return Falseelif action != 'get':print('\033[31;1m -action 參數值錯誤,請重新輸入!\033[0m')return Falset=threading.Thread(target=self.scp_file,args=(ip,port,user,passwd,local,remote,action))res.append(t)t.start()for j in res:j.join()return Truedef scp_file(self,*args,**kwargs):'''此方法用于復制文件到遠程主機,args接收了ip,port等信息。:param args::param kwargs::return:'''ip, port, user, passwd, local, remote,action=[ i for i in args[:] ]try:transport = paramiko.Transport((ip,int(port)))transport.connect(username=user,password=passwd)sftp=paramiko.SFTPClient.from_transport(transport)if action == 'put':sftp.put(local,remote)else:sftp.get(remote,local)print('{}:傳輸完畢'.format(ip).center(50, '='))transport.close()except Exception as e:print('\033[31;1m復制文件失敗,以下是錯誤信息\033[0m')print('錯誤信息: {}'.format(e))return Falseelse:return Truedef run_shell(self,*args, **kwargs):ip,port,user,passwd,cmd,return_info=[ i for i in args[:] ]ssh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)try:ssh.connect(hostname=ip, port=int(port), username=user, password=passwd)stdin,stdout,stderr=ssh.exec_command(cmd)if not stderr:return_info.append((ip, stderr.read()))else:return_info.append((ip, stdout.read()))ssh.close()except Exception as e:print('\033[31;1m執行命令失敗,以下是錯誤信息\033[0m')print('錯誤信息: {}'.format(e))return Falseelse:return Truedef Analytical_command(self,instruction,*args,**kwargs):'''用于解析命令的方法,返回命令和參數:param instruction::param args::param kwargs::return:'''command = instruction.split(' ')[0]parameter = {key:value for key, value in zip(shlex.split(instruction)[1::2], \shlex.split(instruction)[2::2] if len(shlex.split(instruction)[2::2]) != 0 else (None,))}err_arg = []if command in instructions_dict:for arg in parameter.keys():if arg not in instructions_dict[command]:err_arg.append(arg)if len(err_arg) == 0:return command,parameterelse:print('\033[31;1m沒有此參數: {};請使用{} help獲取幫助\033[0m'.format(err_arg,command))return Falseelif command == 'q':return commandelse:print('\033[31;1m沒有{}命令\033[0m'.format(command))return Falsedef print_GroupInfo(self):'''此方法用于打印配置文件中的,群組信息,主機信息,返回組名和主機信息:return:'''config = configparser.ConfigParser()config.read('../conf/Batch_host.conf',encoding='utf-8')GroupInfo=config.sections()while True:for k,v in enumerate(GroupInfo,start=1):print('\033[35;1m({}).{}\033[0m'.format(k,v))select_group=input('>>: ')if select_group.isdigit() and int(select_group) >=1 and int(select_group) <= len(GroupInfo):HostInfo=config.items(GroupInfo[int(select_group)-1])print('\033[36;1m主機信息\033[0m'.center(50, '='))for k,v in enumerate(HostInfo,start=2):print('\033[34;1m({}).{}: {}\033[0m'.format(k,v[0],eval(v[1])[0]))return GroupInfo[int(select_group)-1],HostInfoelif select_group == 'q':exit()else:print('\033[31;1m沒有此群組!\033[0m')continuemanage=HOST_MANAGE() while True:GroupInfo=manage.print_GroupInfo()while True:instruction = input('[%s]>>: ' %GroupInfo[0]).strip()result=manage.Analytical_command(instruction)if type(result) == tuple:getattr(manage,result[0])(result[1],GroupInfo[1])elif result == 'q':breakelse:continue轉載于:https://www.cnblogs.com/SleepDragon/p/10599778.html
總結
以上是生活随笔為你收集整理的Python线程与进程 I/O多路复用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 什么是异常?
- 下一篇: python的内存分配