python3异步协程爬虫_Python实现基于协程的异步爬虫
Python實現基于協程的異步爬蟲
一、課程介紹
1. 課程來源
本課程核心部分來自《500 lines or less》項目,作者是來自 MongoDB 的工程師 A. Jesse Jiryu Davis 與 Python 之父 Guido van Rossum。項目代碼使用 MIT 協議,項目文檔使用?http://creativecommons.org/licenses/by/3.0/legalcode?協議。
課程內容在原文檔基礎上做了稍許修改,增加了部分原理介紹,步驟的拆解分析及源代碼注釋。
2. 內容簡介
傳統計算機科學往往將大量精力放在如何追求更有效率的算法上。但如今大部分涉及網絡的程序,它們的時間開銷主要并不是在計算上,而是在維持多個Socket連接上。亦或是它們的事件循環處理的不夠高效導致了更多的時間開銷。對于這些程序來說,它們面臨的挑戰是如何更高效地等待大量的網絡事件并進行調度。目前流行的解決方式就是使用異步I/O。
本課程將探討幾種實現爬蟲的方法,從傳統的線程池到使用協程,每節課實現一個小爬蟲。另外學習協程的時候,我們會從原理入手,以ayncio協程庫為原型,實現一個簡單的異步編程模型。
本課程實現的爬蟲為爬一個整站的爬蟲,不會爬到站點外面去,且功能較簡單,主要目的在于學習原理,提供實現并發與異步的思路,并不適合直接改寫作為日常工具使用。
3. 課程知識點
本課程項目完成過程中,我們將學習:
線程池實現并發爬蟲
回調方法實現異步爬蟲
協程技術的介紹
一個基于協程的異步編程模型
協程實現異步爬蟲
二、實驗環境
本課程使用Python 3.4,所以本課程內運行py腳本都是使用python3命令。
打開終端,進入?Code?目錄,創建?crawler?文件夾, 并將其作為我們的工作目錄。
$ cd Code
$ mkdir crawler && cd crawler
環保起見,測試爬蟲的網站在本地搭建。
我們使用 Python 2.7 版本官方文檔作為測試爬蟲用的網站
wget http://labfile.oss.aliyuncs.com/courses/574/python-doc.zip
unzip python-doc.zip
安裝serve,一個用起來很方便的靜態文件服務器:
sudo npm install -g serve
啟動服務器:
serve python-doc
如果訪問不了npm的資源,也可以用以下方式開啟服務器:
ruby -run -ehttpd python-doc -p 3000
訪問localhost:3000查看網站:
三、實驗原理
什么是爬蟲?
網絡爬蟲(又被稱為網頁蜘蛛,網絡機器人,在FOAF社區中間,更經常的稱為網頁追逐者),是一種按照一定的規則,自動地抓取萬維網信息的程序或者腳本。
爬蟲的工作流程
網絡爬蟲基本的工作流程是從一個根URL開始,抓取頁面,解析頁面中所有的URL,將還沒有抓取過的URL放入工作隊列中,之后繼續抓取工作隊列中的URL,重復抓取、解析,將解析到的url放入工作隊列的步驟,直到工作隊列為空為止。
線程池、回調、協程
我們希望通過并發執行來加快爬蟲抓取頁面的速度。一般的實現方式有三種:
線程池方式:開一個線程池,每當爬蟲發現一個新鏈接,就將鏈接放入任務隊列中,線程池中的線程從任務隊列獲取一個鏈接,之后建立socket,完成抓取頁面、解析、將新連接放入工作隊列的步驟。
回調方式:程序會有一個主循環叫做事件循環,在事件循環中會不斷獲得事件,通過在事件上注冊解除回調函數來達到多任務并發執行的效果。缺點是一旦需要的回調操作變多,代碼就會非常散,變得難以維護。
協程方式:同樣通過事件循環執行程序,利用了Python?的生成器特性,生成器函數能夠中途停止并在之后恢復,那么原本不得不分開寫的回調函數就能夠寫在一個生成器函數中了,這也就實現了協程。
四、實驗一:線程池實現爬蟲
使用socket抓取頁面需要先建立連接,之后發送GET類型的HTTP報文,等待讀入,將讀到的所有內容存入響應緩存。
def fetch(url):
sock = socket.socket()
sock.connect(('localhost.com', 3000))
request = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url)
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
links = parse_links(response)
q.add(links)
默認的socket連接與讀寫是阻塞式的,在等待讀入的這段時間的CPU占用是被完全浪費的。
多線程
默認這部分同學們都是學過的,所以就粗略記幾個重點,沒學過的同學可以直接參考廖雪峰的教程:廖雪峰的官方網站-Python多線程
導入線程庫:
import threading
開啟一個線程的方法:
t = 你新建的線程
t.start() #開始運行線程
t.join() #你的當前函數就阻塞在這一步直到線程運行完
建立線程的兩種方式:
#第一種:通過函數創建線程
def 函數a():
pass
t = threading.Thread(target=函數a,name=自己隨便取的線程名字)
#第二種:繼承線程類
class Fetcher(threading.Thread):
def __init__(self):
Thread.__init__(self):
#加這一步后主程序中斷退出后子線程也會跟著中斷退出
self.daemon = True
def run(self):
#線程運行的函數
pass
t = Fetcher()
線程同時操作一個全局變量時會產生線程競爭所以需要鎖:
lock = threading.Lock()
lock.acquire() #獲得鎖
#..操作全局變量..
lock.release() #釋放鎖
多線程同步-隊列
多線程同步就是多個線程競爭一個全局變量時按順序讀寫,一般情況下要用鎖,但是使用標準庫里的Queue的時候它內部已經實現了鎖,不用程序員自己寫了。
導入隊列類:
from queue import Queue
創建一個隊列:
q = Queue(maxsize=0)
maxsize為隊列大小,為0默認隊列大小可無窮大。
隊列是先進先出的數據結構:
q.put(item) #往隊列添加一個item,隊列滿了則阻塞
q.get(item) #從隊列得到一個item,隊列為空則阻塞
還有相應的不等待的版本,這里略過。
隊列不為空,或者為空但是取得item的線程沒有告知任務完成時都是處于阻塞狀態
q.join() #阻塞直到所有任務完成
線程告知任務完成使用task_done
q.task_done() #在線程內調用
實現線程池
創建thread.py文件作為爬蟲程序的文件。
我們使用seen_urls來記錄已經解析到的url地址:
seen_urls = set(['/'])
創建Fetcher類:
class Fetcher(Thread):
def __init__(self, tasks):
Thread.__init__(self)
#tasks為任務隊列
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
url = self.tasks.get()
print(url)
sock = socket.socket()
sock.connect(('localhost', 3000))
get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url)
sock.send(get.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
#解析頁面上的所有鏈接
links = self.parse_links(url, response)
lock.acquire()
#得到新鏈接加入任務隊列與seen_urls中
for link in links.difference(seen_urls):
self.tasks.put(link)
seen_urls.update(links)
lock.release()
#通知任務隊列這個線程的任務完成了
self.tasks.task_done()
使用正則庫與url解析庫來解析抓取的頁面,這里圖方便用了正則,同學也可以用Beautifulsoup等專門用來解析頁面的Python庫:
import urllib.parse
import re
在Fetcher中實現parse_links解析頁面:
def parse_links(self, fetched_url, response):
if not response:
print('error: {}'.format(fetched_url))
return set()
if not self._is_html(response):
return set()
#通過href屬性找到所有鏈接
urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''',
self.body(response)))
links = set()
for url in urls:
#可能找到的url是相對路徑,這時候就需要join一下,絕對路徑的話就還是會返回url
normalized = urllib.parse.urljoin(fetched_url, url)
#url的信息會被分段存在parts里
parts = urllib.parse.urlparse(normalized)
if parts.scheme not in ('', 'http', 'https'):
continue
host, port = urllib.parse.splitport(parts.netloc)
if host and host.lower() not in ('localhost'):
continue
#有的頁面會通過地址里的#frag后綴在頁面內跳轉,這里去掉frag的部分
defragmented, frag = urllib.parse.urldefrag(parts.path)
links.add(defragmented)
return links
#得到報文的html正文
def body(self, response):
body = response.split(b'\r\n\r\n', 1)[1]
return body.decode('utf-8')
def _is_html(self, response):
head, body = response.split(b'\r\n\r\n', 1)
headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:])
return headers.get('Content-Type', '').startswith('text/html')
實現線程池類與main的部分:
class ThreadPool:
def __init__(self, num_threads):
self.tasks = Queue()
for _ in range(num_threads):
Fetcher(self.tasks)
def add_task(self, url):
self.tasks.put(url)
def wait_completion(self):
self.tasks.join()
if __name__ == '__main__':
start = time.time()
#開4個線程
pool = ThreadPool(4)
#從根地址開始抓取頁面
pool.add_task("/")
pool.wait_completion()
print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))
運行效果
這里先貼出完整代碼:
from queue import Queue
from threading import Thread, Lock
import urllib.parse
import socket
import re
import time
seen_urls = set(['/'])
lock = Lock()
class Fetcher(Thread):
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
url = self.tasks.get()
print(url)
sock = socket.socket()
sock.connect(('localhost', 3000))
get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url)
sock.send(get.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
links = self.parse_links(url, response)
lock.acquire()
for link in links.difference(seen_urls):
self.tasks.put(link)
seen_urls.update(links)
lock.release()
self.tasks.task_done()
def parse_links(self, fetched_url, response):
if not response:
print('error: {}'.format(fetched_url))
return set()
if not self._is_html(response):
return set()
urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''',
self.body(response)))
links = set()
for url in urls:
normalized = urllib.parse.urljoin(fetched_url, url)
parts = urllib.parse.urlparse(normalized)
if parts.scheme not in ('', 'http', 'https'):
continue
host, port = urllib.parse.splitport(parts.netloc)
if host and host.lower() not in ('localhost'):
continue
defragmented, frag = urllib.parse.urldefrag(parts.path)
links.add(defragmented)
return links
def body(self, response):
body = response.split(b'\r\n\r\n', 1)[1]
return body.decode('utf-8')
def _is_html(self, response):
head, body = response.split(b'\r\n\r\n', 1)
headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:])
return headers.get('Content-Type', '').startswith('text/html')
class ThreadPool:
def __init__(self, num_threads):
self.tasks = Queue()
for _ in range(num_threads):
Fetcher(self.tasks)
def add_task(self, url):
self.tasks.put(url)
def wait_completion(self):
self.tasks.join()
if __name__ == '__main__':
start = time.time()
pool = ThreadPool(4)
pool.add_task("/")
pool.wait_completion()
print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))
運行python3 thread.py命令查看效果(記得先開網站服務器):
使用標準庫中的線程池
線程池直接使用multiprocessing.pool中的ThreadPool:
代碼更改如下:
from multiprocessing.pool import ThreadPool
#...省略中間部分...
#...去掉Fetcher初始化中的self.start()
#...刪除自己實現的ThreadPool...
if __name__ == '__main__':
start = time.time()
pool = ThreadPool()
tasks = Queue()
tasks.put("/")
Workers = [Fetcher(tasks) for i in range(4)]
pool.map_async(lambda w:w.run(), Workers)
tasks.join()
pool.close()
print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))
使用ThreadPool時,它處理的對象可以不是線程對象,實際上Fetcher的線程部分ThreadPool根本用不到。因為它自己內部已開了幾個線程在等待任務輸入。這里偷個懶就只把self.start()去掉了。可以把Fetcher的線程部分全去掉,效果是一樣的。
ThreadPool活用了map函數,這里它將每一個Fetcher對象分配給線程池中的一個線程,線程調用了Fetcher的run函數。這里使用map_async是因為不希望它在那一步阻塞,我們希望在任務隊列join的地方阻塞,那么到隊列為空且任務全部處理完時程序就會繼續執行了。
運行python3 thread.py命令查看效果:
線程池實現的缺陷
我們希望爬蟲的性能能夠進一步提升,但是我們沒辦法開太多的線程,因為線程的內存開銷很大,每創建一個線程可能需要占用50k的內存。以及還有一點,網絡程序的時間開銷往往花在I/O上,socket I/O 阻塞時的那段時間是完全被浪費了的。那么要如何解決這個問題呢?
下節課你就知道啦,下節課見~
總結
以上是生活随笔為你收集整理的python3异步协程爬虫_Python实现基于协程的异步爬虫的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 京东到家如何开电子发票(汉典京字的基本解
- 下一篇: python函数和模块的使用方法_Pyt