第48讲:分布式利器 Scrapy-Redis 原理
在上節課我們提到過,Scrapy-Redis 庫已經為我們提供了 Scrapy 分布式的隊列、調度器、去重等功能,其 GitHub 地址為: https://github.com/rmax/scrapy-redis。
本節課我們深入掌握利用 Redis 實現 Scrapy 分布式的方法,并深入了解 Scrapy-Redis 的原理。
1.獲取源碼
可以把源碼克隆下來,執行如下命令:
git clone https://github.com/rmax/scrapy-redis.git
核心源碼在 scrapy-redis/src/scrapy_redis 目錄下。
2.爬取隊列
我們從爬取隊列入手,來看看它的具體實現。源碼文件為 queue.py,它包含了三個隊列的實現,首先它實現了一個父類 Base,提供一些基本方法和屬性,如下所示:
class Base(object): """Per-spider base queue class""" def __init__(self, server, spider, key, serializer=None): if serializer is None: serializer = picklecompat if not hasattr(serializer, 'loads'): raise TypeError("serializer does not implement 'loads' function: % r" % serializer) if not hasattr(serializer, 'dumps'): raise TypeError("serializer '% s' does not implement 'dumps' function: % r" % serializer) self.server = server self.spider = spider self.key = key % {'spider': spider.name} self.serializer = serializer ? def _encode_request(self, request): obj = request_to_dict(request, self.spider) return self.serializer.dumps(obj) ? def _decode_request(self, encoded_request): obj = self.serializer.loads(encoded_request) return request_from_dict(obj, self.spider) ? def __len__(self): """Return the length of the queue""" raise NotImplementedError ? def push(self, request): """Push a request""" raise NotImplementedError ? def pop(self, timeout=0): """Pop a request""" raise NotImplementedError ? def clear(self): """Clear queue/stack""" self.server.delete(self.key)首先看一下 _encode_request 和 _decode_request 方法,因為我們需要把一個 Request 對象存儲到數據庫中,但數據庫無法直接存儲對象,所以需要將 Request 序列轉化成字符串再存儲。
而這兩個方法分別是序列化和反序列化的操作,利用 pickle 庫來實現,一般在調用 push 將 Request 存入數據庫時會調用 _encode_request 方法進行序列化,在調用 pop 取出 Request 的時候會調用 _decode_request 進行反序列化。
在父類中 len、push 和 pop 方法都是未實現的,會直接拋出 NotImplementedError,因此是不能直接使用這個類的,必須實現一個子類來重寫這三個方法,而不同的子類就會有不同的實現,也就有著不同的功能。
接下來我們就需要定義一些子類來繼承 Base 類,并重寫這幾個方法,那在源碼中就有三個子類的實現,它們分別是 FifoQueue、PriorityQueue、LifoQueue,我們分別來看下它們的實現原理。
首先是 FifoQueue:
class FifoQueue(Base): """Per-spider FIFO queue""" ? def __len__(self): """Return the length of the queue""" return self.server.llen(self.key) ? def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) ? def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.brpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.rpop(self.key) if data: return self._decode_request(data)可以看到這個類繼承了 Base 類,并重寫了 len、push、pop 這三個方法,在這三個方法中都是對 server 對象的操作,而 server 對象就是一個 Redis 連接對象,我們可以直接調用其操作 Redis 的方法對數據庫進行操作,可以看到這里的操作方法有 llen、lpush、rpop 等,這就代表此爬取隊列是使用的 Redis 的列表。
序列化后的 Request 會被存入列表中,就是列表的其中一個元素,len 方法是獲取列表的長度,push 方法中調用了 lpush 操作,這代表從列表左側存入數據,pop 方法中調用了 rpop 操作,這代表從列表右側取出數據。
所以 Request 在列表中的存取順序是左側進、右側出,所以這是有序的進出,即先進先出,英文叫作 First Input First Output,也被簡稱為 FIFO,而此類的名稱就叫作 FifoQueue。
另外還有一個與之相反的實現類,叫作 LifoQueue,實現如下:
class LifoQueue(Base): """Per-spider LIFO queue.""" ? def __len__(self): """Return the length of the stack""" return self.server.llen(self.key) ? def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) ? def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.blpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.lpop(self.key) ? if data: return self._decode_request(data)與 FifoQueue 不同的就是它的 pop 方法,在這里使用的是 lpop 操作,也就是從左側出,而 push 方法依然是使用的 lpush 操作,是從左側入。
那么這樣達到的效果就是先進后出、后進先出,英文叫作 Last In First Out,簡稱為 LIFO,而此類名稱就叫作 LifoQueue。同時這個存取方式類似棧的操作,所以其實也可以稱作 StackQueue。
另外在源碼中還有一個子類實現,叫作 PriorityQueue,顧名思義,它叫作優先級隊列,實現如下:
class PriorityQueue(Base): """Per-spider priority queue abstraction using redis' sorted set""" ? def __len__(self): """Return the length of the queue""" return self.server.zcard(self.key) ? def push(self, request): """Push a request""" data = self._encode_request(request) score = -request.priority self.server.execute_command('ZADD', self.key, score, data) ? def pop(self, timeout=0): """ Pop a request timeout not support in this queue class """ pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute() if results: return self._decode_request(results[0])在這里我們可以看到 len、push、pop 方法中使用了 server 對象的 zcard、zadd、zrange 操作,可以知道這里使用的存儲結果是有序集合 Sorted Set,在這個集合中每個元素都可以設置一個分數,那么這個分數就代表優先級。
在 len 方法里調用了 zcard 操作,返回的就是有序集合的大小,也就是爬取隊列的長度,在 push 方法中調用了 zadd 操作,就是向集合中添加元素,這里的分數指定成 Request 的優先級的相反數,因為分數低的會排在集合的前面,所以這里高優先級的 Request 就會存在集合的最前面。
pop 方法是首先調用了 zrange 操作取出了集合的第一個元素,因為最高優先級的 Request 會存在集合最前面,所以第一個元素就是最高優先級的 Request,然后再調用 zremrangebyrank 操作將這個元素刪除,這樣就完成了取出并刪除的操作。
此隊列是默認使用的隊列,也就是爬取隊列默認是使用有序集合來存儲的。
3.去重過濾
前面說過 Scrapy 的去重是利用集合來實現的,而在 Scrapy 分布式中的去重就需要利用共享的集合,那么這里使用的就是 Redis 中的集合數據結構。我們來看看去重類是怎樣實現的,源碼文件是 dupefilter.py,其內實現了一個 RFPDupeFilter 類,如下所示:
class RFPDupeFilter(BaseDupeFilter): """Redis-based request duplicates filter. This class can also be used with default Scrapy's scheduler. """ logger = logger def __init__(self, server, key, debug=False): """Initialize the duplicates filter. Parameters ---------- server : redis.StrictRedis The redis server instance. key : str Redis key Where to store fingerprints. debug : bool, optional Whether to log filtered requests. """ self.server = server self.key = key self.debug = debug self.logdupes = True ? @classmethod def from_settings(cls, settings): """Returns an instance from given settings. This uses by default the key ``dupefilter:<timestamp>``. When using the ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as it needs to pass the spider name in the key. Parameters ---------- settings : scrapy.settings.Settings Returns ------- RFPDupeFilter A RFPDupeFilter instance. """ server = get_redis_from_settings(settings) key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())} debug = settings.getbool('DUPEFILTER_DEBUG') return cls(server, key=key, debug=debug) ? @classmethod def from_crawler(cls, crawler): """Returns instance from crawler. Parameters ---------- crawler : scrapy.crawler.Crawler Returns ------- RFPDupeFilter Instance of RFPDupeFilter. """ return cls.from_settings(crawler.settings) ? def request_seen(self, request): """Returns True if request was already seen. Parameters ---------- request : scrapy.http.Request Returns ------- bool """ fp = self.request_fingerprint(request) added = self.server.sadd(self.key, fp) return added == 0 ? def request_fingerprint(self, request): """Returns a fingerprint for a given request. Parameters ---------- request : scrapy.http.Request ? Returns ------- str ? """ return request_fingerprint(request) ? def close(self, reason=''): """Delete data on close. Called by Scrapy's scheduler. Parameters ---------- reason : str, optional """ self.clear() ? def clear(self): """Clears fingerprints data.""" self.server.delete(self.key) ? def log(self, request, spider): """Logs given request. Parameters ---------- request : scrapy.http.Request spider : scrapy.spiders.Spider """ if self.debug: msg = "Filtered duplicate request: %(request) s" self.logger.debug(msg, {'request': request}, extra={'spider': spider}) elif self.logdupes: msg = ("Filtered duplicate request %(request) s" "- no more duplicates will be shown" "(see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {'request': request}, extra={'spider': spider}) self.logdupes = False這里同樣實現了一個 request_seen 方法,和 Scrapy 中的 request_seen 方法實現極其類似。不過這里集合使用的是 server 對象的 sadd 操作,也就是集合不再是一個簡單數據結構了,而是直接換成了數據庫的存儲方式。
鑒別重復的方式還是使用指紋,指紋同樣是依靠 request_fingerprint 方法來獲取的。獲取指紋之后就直接向集合添加指紋,如果添加成功,說明這個指紋原本不存在于集合中,返回值 1。代碼中最后的返回結果是判定添加結果是否為 0,如果剛才的返回值為 1,那這個判定結果就是 False,也就是不重復,否則判定為重復。
這樣我們就成功利用 Redis 的集合完成了指紋的記錄和重復的驗證。
4.調度器
Scrapy-Redis 還幫我們實現了配合 Queue、DupeFilter 使用的調度器 Scheduler,源文件名稱是 scheduler.py。我們可以指定一些配置,如 SCHEDULER_FLUSH_ON_START 即是否在爬取開始的時候清空爬取隊列,SCHEDULER_PERSIST 即是否在爬取結束后保持爬取隊列不清除。我們可以在 settings.py 里自由配置,而此調度器很好地實現了對接。
接下來我們看看兩個核心的存取方法,實現如下所示:
def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) self.queue.push(request) return True ? def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) return requestenqueue_request 可以向隊列中添加 Request,核心操作就是調用 Queue 的 push 操作,還有一些統計和日志操作。next_request 就是從隊列中取 Request,核心操作就是調用 Queue 的 pop 操作,此時如果隊列中還有 Request,則 Request 會直接取出來,爬取繼續,否則如果隊列為空,爬取則會重新開始。
5.總結
那么到現在為止我們就把之前所說的三個分布式的問題解決了,總結如下:
-
爬取隊列的實現,在這里提供了三種隊列,使用了 Redis 的列表或有序集合來維護。
-
去重的實現,使用了 Redis 的集合來保存 Request 的指紋以提供重復過濾。
-
中斷后重新爬取的實現,中斷后 Redis 的隊列沒有清空,再次啟動時調度器的 next_request 會從隊列中取到下一個 Request,繼續爬取。
6.結語
以上內容便是 Scrapy-Redis 的核心源碼解析。Scrapy-Redis 中還提供了 Spider、Item Pipeline 的實現,不過它們并不是必須使用的。
在下一節,我們會將 Scrapy-Redis 集成到之前所實現的 Scrapy 項目中,實現多臺主機協同爬取。
總結
以上是生活随笔為你收集整理的第48讲:分布式利器 Scrapy-Redis 原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第17讲:aiohttp 异步爬虫实战
- 下一篇: 第49讲:实战上手,Scrapy-Red