python中的多线程、多进程
簡介
使用Python可以快速地編寫程序,但是python對多線程的支持卻不好,在Python2中,更多地使用多進程。在Python3中,引入了concurrent,便于多線程/進程開發。
Python GIL
Python代碼的執行由Python解釋器進行控制,目前Python的解釋器有多種,比較著名的有CPython、PyPy、Jython等。其中CPython為最廣泛使用的Python解釋器,是最早的由c語言開發。
在OS中,支持多個線程同時執行。 但在Python設計之初考慮到在Python解釋器的主循環中執行Python代碼,于是CPython中設計了全局解釋器鎖GIL(Global Interpreter Lock)機制,用于管理解釋器的訪問,Python線程的執行必須先競爭到GIL權限才能執行。
因此無論是單核還是多核CPU,任意給定時刻只有一個線程會被Python解釋器執行,無法多線程運行。并這也是為什么在多核CPU上,Python的多線程有時效率并不高的根本原因。
Python2中高性能解決方法
Python多任務的解決方案主要由這么幾種:
- 啟動多進程,每個進程只有一個線程,通過多進程執行多任務;
- 啟動單進程,在進程內啟動多線程,通過多線程執行多任務;
- 啟動多進程,在每個進程內再啟動多個線程,同時執行更多的任務–這樣子太復雜,實際上效果并不好,使用的更少。
使用多進程
多進程的package對應的是multiprocessing。
先看一下Process類。
''' from multiprocessing.process import Process, current_process, active_children '''class Process(object):'''Process objects represent activity that is run in a separate processThe class is analagous to `threading.Thread`'''_Popen = Nonedef __init__(self, group=None, target=None, name=None, args=(), kwargs={}):assert group is None, 'group argument must be None for now'count = _current_process._counter.next()self._identity = _current_process._identity + (count,)self._authkey = _current_process._authkeyself._daemonic = _current_process._daemonicself._tempdir = _current_process._tempdirself._parent_pid = os.getpid()self._popen = Noneself._target = targetself._args = tuple(args)self._kwargs = dict(kwargs)self._name = name or type(self).__name__ + '-' + \':'.join(str(i) for i in self._identity)一個簡單的Process的使用示例:
from multiprocessing import Processdef f(name):print 'hello', nameif __name__ == '__main__':p = Process(target=f, args=('bob',))p.start()p.join()多線程處理
線程處理的package是threading.
先簡單看一下Thread類
# Main class for threadsclass Thread(_Verbose):"""A class that represents a thread of control.This class can be safely subclassed in a limited fashion."""__initialized = False# Need to store a reference to sys.exc_info for printing# out exceptions when a thread tries to use a global var. during interp.# shutdown and thus raises an exception about trying to perform some# operation on/with a NoneType__exc_info = _sys.exc_info# Keep sys.exc_clear too to clear the exception just before# allowing .join() to return.__exc_clear = _sys.exc_cleardef __init__(self, group=None, target=None, name=None,args=(), kwargs=None, verbose=None):"""This constructor should always be called with keyword arguments. Arguments are:*group* should be None; reserved for future extension when a ThreadGroupclass is implemented.*target* is the callable object to be invoked by the run()method. Defaults to None, meaning nothing is called.*name* is the thread name. By default, a unique name is constructed ofthe form "Thread-N" where N is a small decimal number.*args* is the argument tuple for the target invocation. Defaults to ().*kwargs* is a dictionary of keyword arguments for the targetinvocation. Defaults to {}.If a subclass overrides the constructor, it must make sure to invokethe base class constructor (Thread.__init__()) before doing anythingelse to the thread."""簡單示例
#!/usr/bin/python from threading import Threaddef count(n):print "begin count..." "\r\n"while n > 0:n-=1print "done."def test_ThreadCount():t1 = Thread(target=count,args=(1000000,))print("start thread.")t1.start()print "join thread." t1.join()if __name__ == '__main__': test_ThreadCount()輸出:
start thread. begin count... join thread.done.使用多進程和多線程性能對比
測試代碼是網友的,使用了timeit, 請先安裝此包。
#!/usr/bin/python from threading import Thread from multiprocessing import Process,Manager from timeit import timeitdef count(n):while n > 0:n-=1def test_normal():count(1000000)count(1000000)def test_Thread():t1 = Thread(target=count,args=(1000000,))t2 = Thread(target=count,args=(1000000,))t1.start()t2.start()t1.join()t2.join()def test_Process():t1 = Process(target=count,args=(1000000,))t2 = Process(target=count,args=(1000000,))t1.start()t2.start()t1.join()t2.join()if __name__ == '__main__':print "test_normal",timeit('test_normal()','from __main__ import test_normal',number=10)print "test_Thread",timeit('test_Thread()','from __main__ import test_Thread',number=10)print "test_Process",timeit('test_Process()','from __main__ import test_Process',number=10)執行后的輸出結果:
test_normal 1.0291161 test_Thread 7.5084157 test_Process 1.6441867可見,直接使用方法反而最快,使用Process次之,使用Thread最慢。單這個測試只是運算測試。如果有IO類的慢速操作時,還是要使用Process或者Thread。
python3中的concurrent.futures包
使用java或者CSharp的開發者,對future應該比較了解。這個是用以并發支持。
在Python3.2中提供了concurrent.futures包, 而python 2.7需要安裝futures模塊,使用命令pip install futures安裝即可.
模塊concurrent.futures給開發者提供一個執行異步調用的高級接口。concurrent.futures基本上就是在Python的threading和multiprocessing模塊之上構建的抽象層,更易于使用。盡管這個抽象層簡化了這些模塊的使用,但是也降低了很多靈活性。
這里最重要的是類Executor,當然Executor是抽象類,具體的實現類有2個,分別是ThreadPoolExecutor 和 ProcessPoolExecutor,正如名字所示,分別對應著Thread和Process的執行池。
看一下ProcessPoolExecutor定義, 缺省地,最大的工作任務應該和CPU數量匹配。
class ProcessPoolExecutor(_base.Executor):def __init__(self, max_workers=None):"""Initializes a new ProcessPoolExecutor instance.Args:max_workers: The maximum number of processes that can be used toexecute the given calls. If None or not given then as manyworker processes will be created as the machine has processors."""_check_system_limits()if max_workers is None:self._max_workers = multiprocessing.cpu_count()else:if max_workers <= 0:raise ValueError("max_workers must be greater than 0")self._max_workers = max_workers再看一下ThreadPoolExecutor的定義, 最重疊IO上(或者參考CompleteIO),處理最大的工作數量應該cpu數量的5倍。
class ThreadPoolExecutor(_base.Executor):def __init__(self, max_workers=None):"""Initializes a new ThreadPoolExecutor instance.Args:max_workers: The maximum number of threads that can be used toexecute the given calls."""if max_workers is None:# Use this number because ThreadPoolExecutor is often# used to overlap I/O instead of CPU work.max_workers = (cpu_count() or 1) * 5if max_workers <= 0:raise ValueError("max_workers must be greater than 0")self._max_workers = max_workersself._work_queue = queue.Queue()self._threads = set()self._shutdown = Falseself._shutdown_lock = threading.Lock()看一個簡單的示例,改編自網友的程序:
#!/usr/bin/python2 import os import urllibfrom concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed from concurrent.futures import ProcessPoolExecutordef downloader(url):req = urllib.urlopen(url)if (req != None):print "begin down", url filename = os.path.basename(url)ext = os.path.splitext(url)[1]if not ext:raise RuntimeError("URL does not contain an extension")with open(filename,"wb") as file_handle:while True:chunk = req.read(1024)if not chunk:breakfile_handle.write(chunk)msg = "Finished downloading {filename}".format(filename = filename)return msgdef mainProcess(urls):with ProcessPoolExecutor(max_workers = 5) as executor:futures = [executor.submit(downloader,url) for url in urls]for future in as_completed(futures):print(future.result())def mainThread(urls):with ThreadPoolExecutor(max_workers = 5) as executor:futures = [executor.submit(downloader,url) for url in urls]for future in as_completed(futures):print(future.result())if __name__ == "__main__":urls1 = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf","http://www.irs.gov/pub/irs-pdf/f1040a.pdf","http://www.irs.gov/pub/irs-pdf/f1040ez.pdf"]urls2 = ["http://www.irs.gov/pub/irs-pdf/f1040es.pdf","http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]mainProcess(urls1)mainThread(urls2)執行3次,輸出如下:
----1 begin down http://www.irs.gov/pub/irs-pdf/f1040ez.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040a.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040.pdf Finished downloading f1040ez.pdf Finished downloading f1040.pdf Finished downloading f1040a.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040es.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040sb.pdf Finished downloading f1040sb.pdf Finished downloading f1040es.pdf----2 begin down http://www.irs.gov/pub/irs-pdf/f1040.pdfb egin down http://www.irs.gov/pub/irs-pdf/f1040ez.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040a.pdf Finished downloading f1040ez.pdf Finished downloading f1040a.pdf Finished downloading f1040.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040es.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040sb.pdf Finished downloading f1040sb.pdf Finished downloading f1040es.pdf----3 begin down http://www.irs.gov/pub/irs-pdf/f1040.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040a.pdf Finished downloading f1040.pdf Finished downloading f1040a.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040ez.pdf Finished downloading f1040ez.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040sb.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040es.pdf Finished downloading f1040sb.pdf Finished downloading f1040es.pdf總結
以上是生活随笔為你收集整理的python中的多线程、多进程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 美国辉瑞癌症畅销药专利将到期 每年损失超
- 下一篇: websocket python爬虫_p