想用Python爬小姐姐图片?那你得先搞定分布式进程
導讀:分布式進程指的是將Process進程分布到多臺機器上,充分利用多臺機器的性能完成復雜的任務。我們可以將這一點應用到分布式爬蟲的開發中。
作者:范傳輝
如需轉載請聯系大數據(ID:hzdashuju)
分布式進程在Python中依然要用到multiprocessing模塊。multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。可以寫一個服務進程作為調度者,將任務分布到其他多個進程中,依靠網絡通信進行管理。
舉個例子:在做爬蟲程序時,常常會遇到這樣的場景,我們想抓取某個網站的所有圖片,如果使用多進程的話,一般是一個進程負責抓取圖片的鏈接地址,將鏈接地址存放到Queue中,另外的進程負責從Queue中讀取鏈接地址進行下載和存儲到本地。
現在把這個過程做成分布式,一臺機器上的進程負責抓取鏈接,其他機器上的進程負責下載存儲。那么遇到的主要問題是將Queue暴露到網絡中,讓其他機器進程都可以訪問,分布式進程就是將這一個過程進行了封裝,我們可以將這個過程稱為本地隊列的網絡化。整體過程如圖1-24所示。
▲圖1-24 分布式進程
要實現上面例子的功能,創建分布式進程需要分為六個步驟:
建立隊列Queue,用來進行進程間的通信。服務進程創建任務隊列task_queue,用來作為傳遞任務給任務進程的通道;服務進程創建結果隊列result_queue,作為任務進程完成任務后回復服務進程的通道。在分布式多進程環境下,必須通過由Queuemanager獲得的Queue接口來添加任務。
把第一步中建立的隊列在網絡上注冊,暴露給其他進程(主機),注冊后獲得網絡隊列,相當于本地隊列的映像。
建立一個對象(Queuemanager(BaseManager))實例manager,綁定端口和驗證口令。
啟動第三步中建立的實例,即啟動管理manager,監管信息通道。
通過管理實例的方法獲得通過網絡訪問的Queue對象,即再把網絡隊列實體化成可以使用的本地隊列。
創建任務到“本地”隊列中,自動上傳任務到網絡隊列中,分配給任務進程進行處理。
接下來通過程序實現上面的例子(Linux版),首先編寫的是服務進程(taskManager.py),代碼如下:
from?multiprocessing.managers?import?BaseManager
#?第一步:建立task_queue和result_queue,用來存放任務和結果
task_queue=Queue.Queue()
result_queue=Queue.Queue()
class?Queuemanager(BaseManager):
????pass
#?第二步:把創建的兩個隊列注冊在網絡上,利用register方法,callable參數關聯了Queue對象,
#?將Queue對象在網絡中暴露
Queuemanager.register('get_task_queue',callable=lambda:task_queue)
Queuemanager.register('get_result_queue',callable=lambda:result_queue)
#?第三步:綁定端口8001,設置驗證口令‘qiye’。這個相當于對象的初始化
manager=Queuemanager(address=('',8001),authkey='qiye')
#?第四步:啟動管理,監聽信息通道
manager.start()
#?第五步:通過管理實例的方法獲得通過網絡訪問的Queue對象
task=manager.get_task_queue()
result=manager.get_result_queue()
#?第六步:添加任務
for?url?in?["ImageUrl_"+i?for?i?in?range(10)]:
????print?'put?task?%s?...'?%url
????task.put(url)
#?獲取返回結果
print?'try?get?result...'
for?i?in?range(10):
????print?'result?is?%s'?%result.get(timeout=10)
#?關閉管理
manager.shutdown()
任務進程已經編寫完成,接下來編寫任務進程(taskWorker.py),創建任務進程的步驟相對較少,需要四個步驟:
使用QueueManager注冊用于獲取Queue的方法名稱,任務進程只能通過名稱來在網絡上獲取Queue。
連接服務器,端口和驗證口令注意保持與服務進程中完全一致。
從網絡上獲取Queue,進行本地化。
從task隊列獲取任務,并把結果寫入result隊列。
程序taskWorker.py代碼(win/linux版)如下:
import?time
from?multiprocessing.managers?import?BaseManager
#?創建類似的QueueManager:
class?QueueManager(BaseManager):
????pass
#?第一步:使用QueueManager注冊用于獲取Queue的方法名稱
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
#?第二步:連接到服務器:
server_addr?=?'127.0.0.1'
print('Connect?to?server?%s...'?%?server_addr)
#?端口和驗證口令注意保持與服務進程完全一致:
m?=?QueueManager(address=(server_addr,?8001),?authkey='qiye')
#?從網絡連接:
m.connect()
#?第三步:獲取Queue的對象:
task?=?m.get_task_queue()
result?=?m.get_result_queue()
#?第四步:從task隊列獲取任務,并把結果寫入result隊列:
while(not?task.empty()):
????image_url?=?task.get(True,timeout=5)
????print('run?task?download?%s...'?%?image_url)
????time.sleep(1)
????result.put('%s--->success'%image_url)
#?處理結束:
print('worker?exit.')
最后開始運行程序,先啟動服務進程taskManager.py,運行結果如下:
put?task?ImageUrl_2?...
put?task?ImageUrl_3?...
put?task?ImageUrl_4?...
put?task?ImageUrl_5?...
put?task?ImageUrl_6?...
put?task?ImageUrl_7?...
put?task?ImageUrl_8?...
put?task?ImageUrl_9?...
try?get?result...
接著再啟動任務進程taskWorker.py,運行結果如下:
run?task?download?ImageUrl_1...
run?task?download?ImageUrl_2...
run?task?download?ImageUrl_3...
run?task?download?ImageUrl_4...
run?task?download?ImageUrl_5...
run?task?download?ImageUrl_6...
run?task?download?ImageUrl_7...
run?task?download?ImageUrl_8...
run?task?download?ImageUrl_9...
worker?exit.
當任務進程運行結束后,服務進程運行結果如下:
result?is?ImageUrl_2--->success
result?is?ImageUrl_3--->success
result?is?ImageUrl_4--->success
result?is?ImageUrl_5--->success
result?is?ImageUrl_6--->success
result?is?ImageUrl_7--->success
result?is?ImageUrl_8--->success
result?is?ImageUrl_9--->success
其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個worker,就可以把任務分布到幾臺甚至幾十臺機器上,實現大規模的分布式爬蟲。
注:由于平臺的特性,創建服務進程的代碼在Linux和Windows上有一些不同,創建工作進程的代碼是一致的。
taskManager.py程序在Windows版下的代碼如下:
#?taskManager.py?for?windows
import?Queue
from?multiprocessing.managers?import?BaseManager
from?multiprocessing?import?freeze_support
#?任務個數
task_number?=?10
#?定義收發隊列
task_queue?=?Queue.Queue(task_number);
result_queue?=?Queue.Queue(task_number);
def?get_task():
????return?task_queue
def?get_result():
????return?result_queue
#?創建類似的QueueManager:
class?QueueManager(BaseManager):
????pass
def?win_run():
????#?Windows下綁定調用接口不能使用lambda,所以只能先定義函數再綁定
????QueueManager.register('get_task_queue',callable?=?get_task)
????QueueManager.register('get_result_queue',callable?=?get_result)
????#?綁定端口并設置驗證口令,Windows下需要填寫IP地址,Linux下不填默認為本地
????manager?=?QueueManager(address?=?('127.0.0.1',8001),authkey?=?'qiye')
????#?啟動
????manager.start()
????try:
????????#?通過網絡獲取任務隊列和結果隊列
????????task?=?manager.get_task_queue()
????????result?=?manager.get_result_queue()
????????#?添加任務
????????for?url?in?["ImageUrl_"+str(i)?for?i?in?range(10)]:
????????????print?'put?task?%s?...'?%url
????????????task.put(url)
????????print?'try?get?result...'
????????for?i?in?range(10):
????????????print?'result?is?%s'?%result.get(timeout=10)
????except:
????????print('Manager?error')
????finally:
????????#?一定要關閉,否則會報管道未關閉的錯誤
????????manager.shutdown()
if?__name__?==?'__main__':
????#?Windows下多進程可能會有問題,添加這句可以緩解
????freeze_support()
????win_run()
關于作者:范傳輝,資深網蟲,Python開發者,參與開發了多項網絡應用,在實際開發中積累了豐富的實戰經驗,并善于總結,貢獻了多篇技術文章廣受好評。研究興趣是網絡安全、爬蟲技術、數據分析、驅動開發等技術。
本文摘編自《Python爬蟲開發與項目實戰》,經出版方授權發布。
延伸閱讀《Python爬蟲開發與項目實戰》
點擊上圖了解及購買
轉載請聯系微信:DoctorData
推薦語:零基礎學習爬蟲技術,從Python和Web前端基礎開始講起,由淺入深,包含大量案例,實用性強。
據統計,99%的大咖都完成了這個神操作
▼
更多精彩
在公眾號后臺對話框輸入以下關鍵詞
查看更多優質內容!
PPT?|?報告?|?讀書?|?書單?|?干貨?
大數據?|?揭秘?|?Python?|?可視化
AI?|?人工智能?|?5G?|?區塊鏈
機器學習?|?深度學習?|?神經網絡
1024?|?段子?|?數學?|?高考
猜你想看
深度學習高能干貨:手把手教你搭建MXNet框架
手把手教你用OpenCV實現機器學習最簡單的k-NN算法(附代碼)
41款實用工具,數據獲取、清洗、建模、可視化都有了
你是怎樣“被平均”的?細數統計數據中的那些坑
Q:?你還知道哪些爬蟲神技?
歡迎留言與大家分享
覺得不錯,請把這篇文章分享給你的朋友
轉載 / 投稿請聯系:baiyu@hzbook.com
更多精彩,請在后臺點擊“歷史文章”查看
點擊閱讀原文,了解更多
總結
以上是生活随笔為你收集整理的想用Python爬小姐姐图片?那你得先搞定分布式进程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 50年后的地球什么样?大数据、AI、量子
- 下一篇: websocket python爬虫_p