日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

分布式爬虫系统设计、实现与实战:爬取京东、苏宁易购全网手机商品数据+MySQL、HBase存储...

發(fā)布時間:2025/4/5 数据库 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 分布式爬虫系统设计、实现与实战:爬取京东、苏宁易购全网手机商品数据+MySQL、HBase存储... 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

http://blog.51cto.com/xpleaf/2093952

1 概述

在不用爬蟲框架的情況,經(jīng)過多方學(xué)習(xí),嘗試實現(xiàn)了一個分布式爬蟲系統(tǒng),并且可以將數(shù)據(jù)保存到不同地方,類似MySQL、HBase等。

基于面向接口的編碼思想來開發(fā),因此這個系統(tǒng)具有一定的擴展性,有興趣的朋友直接看一下代碼,就能理解其設(shè)計思想,雖然代碼目前來說很多地方還是比較緊耦合,但只要花些時間和精力,很多都是可抽取出來并且可配置化的。

因為時間的關(guān)系,我只寫了京東和蘇寧易購兩個網(wǎng)站的爬蟲,但是完全可以實現(xiàn)不同網(wǎng)站爬蟲的隨機調(diào)度,基于其代碼結(jié)構(gòu),再寫國美、天貓等的商品爬取,難度不大,但是估計需要花很多時間和精力。因為在解析網(wǎng)頁的數(shù)據(jù)時,實際上需要花很多時間,比如我在爬取蘇寧易購商品的價格時,價格是異步獲取的,并且其api是一長串的數(shù)字組合,我花了幾個小時的時間才發(fā)現(xiàn)其規(guī)律,當(dāng)然也承認(rèn),我的經(jīng)驗不足。

這個系統(tǒng)的設(shè)計,除了基本的數(shù)據(jù)爬取以外,更關(guān)注以下幾個方面的問題:

  • 1.如何實現(xiàn)分布式,同一個程序打包后分發(fā)到不同的節(jié)點運行時,不影響整體的數(shù)據(jù)爬取
  • 2.如何實現(xiàn)url隨機循環(huán)調(diào)度,核心是針對不同的頂級域名做隨機
  • 3.如何定時向url倉庫中添加種子url,達到不讓爬蟲系統(tǒng)停下來的目的
  • 4.如何實現(xiàn)對爬蟲節(jié)點程序的監(jiān)控,并能夠發(fā)郵件報警
  • 5.如何實現(xiàn)一個隨機IP代理庫,目的跟第2點有點類似,都是為了反反爬蟲

下面會針對這個系統(tǒng)來做一個整體的基本介紹,其實我在代碼中都有非常詳細(xì)的注釋,有興趣的朋友可以參考一下代碼,最后我會給出一些我爬蟲時的數(shù)據(jù)分析。

另外需要注意的是,這個爬蟲系統(tǒng)是基于Java實現(xiàn)的,但是語言本身仍然不是最重要的,有興趣的朋友可以嘗試用Python實現(xiàn)。

2 分布式爬蟲系統(tǒng)架構(gòu)

整體系統(tǒng)架構(gòu)如下:

所以從上面的架構(gòu)可以看出,整個系統(tǒng)主要分為三個部分:

  • 爬蟲系統(tǒng)
  • URL調(diào)度系統(tǒng)
  • 監(jiān)控報警系統(tǒng)

爬蟲系統(tǒng)就是用來爬取數(shù)據(jù)的,因為系統(tǒng)設(shè)計為分布式,因此,爬蟲程序本身可以運行在不同的服務(wù)器節(jié)點上。

url調(diào)度系統(tǒng)核心在于url倉庫,所謂的url倉庫其實就是用Redis保存了需要爬取的url列表,并且在我們的url調(diào)度器中根據(jù)一定的策略來消費其中的url,從這個角度考慮,url倉庫其實也是一個url隊列。

監(jiān)控報警系統(tǒng)主要是對爬蟲節(jié)點進行監(jiān)控,雖然并行執(zhí)行的爬蟲節(jié)點中的某一個掛掉了對整體數(shù)據(jù)爬取本身沒有影響(只是降低了爬蟲的速度),但是我們還是希望知道能夠主動接收到節(jié)點掛掉的通知,而不是被動地發(fā)現(xiàn)。

下面將會針對以上三個方面并結(jié)合部分代碼片段來對整個系統(tǒng)的設(shè)計思路做一些基本的介紹,對系統(tǒng)完整實現(xiàn)有濃厚興趣的朋友可以直接參考源代碼。

3 爬蟲系統(tǒng)

(說明:zookeeper監(jiān)控屬于監(jiān)控報警系統(tǒng),url調(diào)度器屬于URL調(diào)度系統(tǒng))

爬蟲系統(tǒng)是一個獨立運行的進程,我們把我們的爬蟲系統(tǒng)打包成jar包,然后分發(fā)到不同的節(jié)點上執(zhí)行,這樣并行爬取數(shù)據(jù)可以提高爬蟲的效率。

3.1 隨機IP代理器

加入隨機IP代理主要是為了反反爬蟲,因此如果有一個IP代理庫,并且可以在構(gòu)建http客戶端時可以隨機地使用不同的代理,那么對我們進行反反爬蟲則會有很大的幫助。

在系統(tǒng)中使用IP代理庫,需要先在文本文件中添加可用的代理地址信息:

# IPProxyRepository.txt 58.60.255.104:8118 219.135.164.245:3128 27.44.171.27:9999 219.135.164.245:3128 58.60.255.104:8118 58.252.6.165:9000 ......

需要注意的是,上面的代理IP是我在西刺代理上拿到的一些代理IP,不一定可用,建議是自己花錢購買一批代理IP,這樣可以節(jié)省很多時間和精力去尋找代理IP。

然后在構(gòu)建http客戶端的工具類中,當(dāng)?shù)谝淮问褂霉ぞ哳悤r,會把這些代理IP加載進內(nèi)存中,加載到Java的一個HashMap:

// IP地址代理庫Map private static Map<String, Integer> IPProxyRepository = new HashMap<>(); private static String[] keysArray = null; // keysArray是為了方便生成隨機的代理對象 /** * 初次使用時使用靜態(tài)代碼塊將IP代理庫加載進set中 */ static { InputStream in = HttpUtil.class.getClassLoader().getResourceAsStream("IPProxyRepository.txt"); // 加載包含代理IP的文本 // 構(gòu)建緩沖流對象 InputStreamReader isr = new InputStreamReader(in); BufferedReader bfr = new BufferedReader(isr); String line = null; try { // 循環(huán)讀每一行,添加進map中 while ((line = bfr.readLine()) != null) { String[] split = line.split(":"); // 以:作為分隔符,即文本中的數(shù)據(jù)格式應(yīng)為192.168.1.1:4893 String host = split[0]; int port = Integer.valueOf(split[1]); IPProxyRepository.put(host, port); } Set<String> keys = IPProxyRepository.keySet(); keysArray = keys.toArray(new String[keys.size()]); // keysArray是為了方便生成隨機的代理對象 } catch (IOException e) { e.printStackTrace(); } }

之后,在每次構(gòu)建http客戶端時,都會先到map中看是否有代理IP,有則使用,沒有則不使用代理:

CloseableHttpClient httpClient = null; HttpHost proxy = null; if (IPProxyRepository.size() > 0) { // 如果ip代理地址庫不為空,則設(shè)置代理 proxy = getRandomProxy(); httpClient = HttpClients.custom().setProxy(proxy).build(); // 創(chuàng)建httpclient對象 } else { httpClient = HttpClients.custom().build(); // 創(chuàng)建httpclient對象 } HttpGet request = new HttpGet(url); // 構(gòu)建htttp get請求 ......

隨機代理對象則通過下面的方法生成:

/*** 隨機返回一個代理對象** @return*/ public static HttpHost getRandomProxy() { // 隨機獲取host:port,并構(gòu)建代理對象 Random random = new Random(); String host = keysArray[random.nextInt(keysArray.length)]; int port = IPProxyRepository.get(host); HttpHost proxy = new HttpHost(host, port); // 設(shè)置http代理 return proxy; }

這樣,通過上面的設(shè)計,基本就實現(xiàn)了隨機IP代理器的功能,當(dāng)然,其中還有很多可以完善的地方,比如,當(dāng)使用這個IP代理而請求失敗時,是否可以把這一情況記錄下來,當(dāng)超過一定次數(shù)時,再將其從代理庫中刪除,同時生成日志供開發(fā)人員或運維人員參考,這是完全可以實現(xiàn)的,不過我就不做這一步功能了。

3.2 網(wǎng)頁下載器

網(wǎng)頁下載器就是用來下載網(wǎng)頁中的數(shù)據(jù),主要基于下面的接口開發(fā):

/*** 網(wǎng)頁數(shù)據(jù)下載*/ public interface IDownload { /** * 下載給定url的網(wǎng)頁數(shù)據(jù) * @param url * @return */ public Page download(String url); }

基于此,在系統(tǒng)中只實現(xiàn)了一個http get的下載器,但是也可以完成我們所需要的功能了:

/*** 數(shù)據(jù)下載實現(xiàn)類*/ public class HttpGetDownloadImpl implements IDownload { @Override public Page download(String url) { Page page = new Page(); String content = HttpUtil.getHttpContent(url); // 獲取網(wǎng)頁數(shù)據(jù) page.setUrl(url); page.setContent(content); return page; } }

3.3 網(wǎng)頁解析器

網(wǎng)頁解析器就是把下載的網(wǎng)頁中我們感興趣的數(shù)據(jù)解析出來,并保存到某個對象中,供數(shù)據(jù)存儲器進一步處理以保存到不同的持久化倉庫中,其基于下面的接口進行開發(fā):

/*** 網(wǎng)頁數(shù)據(jù)解析*/ public interface IParser { public void parser(Page page); }

網(wǎng)頁解析器在整個系統(tǒng)的開發(fā)中也算是比較重頭戲的一個組件,功能不復(fù)雜,主要是代碼比較多,針對不同的商城不同的商品,對應(yīng)的解析器可能就不一樣了,因此需要針對特別的商城的商品進行開發(fā),因為很顯然,京東用的網(wǎng)頁模板跟蘇寧易購的肯定不一樣,天貓用的跟京東用的也肯定不一樣,所以這個完全是看自己的需要來進行開發(fā)了,只是說,在解析器開發(fā)的過程當(dāng)中會發(fā)現(xiàn)有部分重復(fù)代碼,這時就可以把這些代碼抽象出來開發(fā)一個工具類了。

目前在系統(tǒng)中爬取的是京東和蘇寧易購的手機商品數(shù)據(jù),因此與就寫了這兩個實現(xiàn)類:

/*** 解析京東商品的實現(xiàn)類*/ public class JDHtmlParserImpl implements IParser { ...... } /** * 蘇寧易購網(wǎng)頁解析 */ public class SNHtmlParserImpl implements IParser { ...... }

3.4 數(shù)據(jù)存儲器

數(shù)據(jù)存儲器主要是將網(wǎng)頁解析器解析出來的數(shù)據(jù)對象保存到不同的,而對于本次爬取的手機商品,數(shù)據(jù)對象是下面一個Page對象:

/*** 網(wǎng)頁對象,主要包含網(wǎng)頁內(nèi)容和商品數(shù)據(jù)*/ public class Page { private String content; // 網(wǎng)頁內(nèi)容 private String id; // 商品Id private String source; // 商品來源 private String brand; // 商品品牌 private String title; // 商品標(biāo)題 private float price; // 商品價格 private int commentCount; // 商品評論數(shù) private String url; // 商品地址 private String imgUrl; // 商品圖片地址 private String params; // 商品規(guī)格參數(shù) private List<String> urls = new ArrayList<>(); // 解析列表頁面時用來保存解析的商品url的容器 }

對應(yīng)的,在MySQL中,表數(shù)據(jù)結(jié)構(gòu)如下:

-- ---------------------------- -- Table structure for phone -- ---------------------------- DROP TABLE IF EXISTS `phone`; CREATE TABLE `phone` ( `id` varchar(30) CHARACTER SET armscii8 NOT NULL COMMENT '商品id', `source` varchar(30) NOT NULL COMMENT '商品來源,如jd suning gome等', `brand` varchar(30) DEFAULT NULL COMMENT '手機品牌', `title` varchar(255) DEFAULT NULL COMMENT '商品頁面的手機標(biāo)題', `price` float(10,2) DEFAULT NULL COMMENT '手機價格', `comment_count` varchar(30) DEFAULT NULL COMMENT '手機評論', `url` varchar(500) DEFAULT NULL COMMENT '手機詳細(xì)信息地址', `img_url` varchar(500) DEFAULT NULL COMMENT '圖片地址', `params` text COMMENT '手機參數(shù),json格式存儲', PRIMARY KEY (`id`,`source`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

而在HBase中的表結(jié)構(gòu)則為如下:

## cf1 存儲 id source price comment brand url ## cf2 存儲 title params imgUrl create 'phone', 'cf1', 'cf2' ## 在HBase shell中查看創(chuàng)建的表 hbase(main):135:0> desc 'phone' Table phone is ENABLED phone COLUMN FAMILIES DESCRIPTION {NAME => 'cf1', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK _ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} {NAME => 'cf2', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK _ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} 2 row(s) in 0.0350 seconds

即在HBase中建立了兩個列族,分別為cf1、cf2,其中cf1用來保存id source price comment brand url字段信息,cf2用來保存title params imgUrl字段信息。

不同的數(shù)據(jù)存儲用的是不同的實現(xiàn)類,但是其都是基于下面同一個接口開發(fā)的:

/*** 商品數(shù)據(jù)的存儲*/ public interface IStore { public void store(Page page); }

然后基于此開發(fā)了MySQL的存儲實現(xiàn)類、HBase的存儲實現(xiàn)類還有控制臺的輸出實現(xiàn)類,如MySQL的存儲實現(xiàn)類,其實就是簡單的數(shù)據(jù)插入語句:

/*** 使用dbc數(shù)據(jù)庫連接池將數(shù)據(jù)寫入mysql表中*/ public class MySQLStoreImpl implements IStore { private QueryRunner queryRunner = new QueryRunner(DBCPUtil.getDataSource()); @Override public void store(Page page) { String sql = "insert into phone(id, source, brand, title, price, comment_count, url, img_url, params) values(?, ?, ?, ?, ?, ?, ?, ?, ?)"; try { queryRunner.update(sql, page.getId(), page.getSource(), page.getBrand(), page.getTitle(), page.getPrice(), page.getCommentCount(), page.getUrl(), page.getImgUrl(), page.getParams()); } catch (SQLException e) { e.printStackTrace(); } } }

而HBase的存儲實現(xiàn)類,則是HBase Java API的常用插入語句代碼:

...... // cf1:price Put pricePut = new Put(rowKey); // 必須要做是否為null判斷,否則會有空指針異常 pricePut.addColumn(cf1, "price".getBytes(), page.getPrice() != null ? String.valueOf(page.getPrice()).getBytes() : "".getBytes()); puts.add(pricePut); // cf1:comment Put commentPut = new Put(rowKey); commentPut.addColumn(cf1, "comment".getBytes(), page.getCommentCount() != null ? String.valueOf(page.getCommentCount()).getBytes() : "".getBytes()); puts.add(commentPut); // cf1:brand Put brandPut = new Put(rowKey); brandPut.addColumn(cf1, "brand".getBytes(), page.getBrand() != null ? page.getBrand().getBytes() : "".getBytes()); puts.add(brandPut); ......

當(dāng)然,至于要將數(shù)據(jù)存儲在哪個地方,在初始化爬蟲程序時,是可以手動選擇的:

// 3.注入存儲器 iSpider.setStore(new HBaseStoreImpl());

目前還沒有把代碼寫成可以同時存儲在多個地方,按照目前代碼的架構(gòu),要實現(xiàn)這一點也比較簡單,修改一下相應(yīng)代碼就好了。實際上,是可以先把數(shù)據(jù)保存到MySQL中,然后通過Sqoop導(dǎo)入到HBase中,詳細(xì)操作可以參考我寫的Sqoop文章。

仍然需要注意的是,如果確定需要將數(shù)據(jù)保存到HBase中,請保證你有可用的集群環(huán)境,并且需要將如下配置文檔添加到classpath下:

core-site.xml hbase-site.xml hdfs-site.xml

對大數(shù)據(jù)感興趣的同學(xué)可以折騰一下這一點,如果之前沒有接觸過的,直接使用MySQL存儲就好了,只需要在初始化爬蟲程序時注入MySQL存儲器即可:

// 3.注入存儲器 iSpider.setStore(new MySQLStoreImpl());

4 URL調(diào)度系統(tǒng)

URL調(diào)度系統(tǒng)是實現(xiàn)整個爬蟲系統(tǒng)分布式的橋梁與關(guān)鍵,正是通過URL調(diào)度系統(tǒng)的使用,才使得整個爬蟲系統(tǒng)可以較為高效(Redis作為存儲)隨機地獲取url,并實現(xiàn)整個系統(tǒng)的分布式。

4.1 URL倉庫

通過架構(gòu)圖可以看出,所謂的URL倉庫不過是Redis倉庫,即在我們的系統(tǒng)中使用Redis來保存url地址列表,正是這樣,才能保證我們的程序?qū)崿F(xiàn)分布式,只要保存了url是唯一的,這樣不管我們的爬蟲程序有多少個,最終保存下來的數(shù)據(jù)都是只有唯一一份的,而不會重復(fù),是通過這樣來實現(xiàn)分布式的。

同時url倉庫中的url地址在獲取時的策略是通過隊列的方式來實現(xiàn)的,待會通過URL調(diào)度器的實現(xiàn)即可知道。

另外,在我們的url倉庫中,主要保存了下面的數(shù)據(jù):

  • 種子URL列表

Redis的數(shù)據(jù)類型為list。

種子URL是持久化存儲的,一定時間后,由URL定時器通過種子URL獲取URL,并將其注入到我們的爬蟲程序需要使用的高優(yōu)先級URL隊列中,這樣就可以保存我們的爬蟲程序可以源源不斷地爬取數(shù)據(jù)而不需要中止程序的執(zhí)行。

  • 高優(yōu)先級URL隊列

Redis的數(shù)據(jù)類型為set。

什么是高優(yōu)先級URL隊列?其實它就是用來保存列表url的。

那么什么是列表url呢?

說白了就是一個列表中含有多個商品,以京東為列,我們打開一個手機列表為例:

該地址中包含的不是一個具體商品的url,而是包含了多個我們需要爬取的數(shù)據(jù)(手機商品)的列表,通過對每個高級url的解析,我們可以獲取到非常多的具體商品url,而具體的商品url,就是低優(yōu)先url,其會保存到低優(yōu)先級URL隊列中。

那么以這個系統(tǒng)為例,保存的數(shù)據(jù)類似如下:

jd.com.higher--https://list.jd.com/list.html?cat=9987,653,655&page=1... suning.com.higher--https://list.suning.com/0-20006-0.html...
  • 低優(yōu)先級URL隊列

Redis的數(shù)據(jù)類型為set。

低優(yōu)先級URL其實就是具體某個商品的URL,如下面一個手機商品:

通過下載該url的數(shù)據(jù),并對其進行解析,就能夠獲取到我們想要的數(shù)據(jù)。

那么以這個系統(tǒng)為例,保存的數(shù)據(jù)類似如下:

jd.com.lower--https://item.jd.com/23545806622.html... suning.com.lower--https://product.suning.com/0000000000/690128156.html...

4.2 URL調(diào)度器

所謂url調(diào)度器,其實說白了就是url倉庫java代碼的調(diào)度策略,不過因為其核心在于調(diào)度,所以將其放到URL調(diào)度器中來進行說明,目前其調(diào)度基于以下接口開發(fā):

/*** url 倉庫* 主要功能:* 向倉庫中添加url(高優(yōu)先級的列表,低優(yōu)先級的商品url)* 從倉庫中獲取url(優(yōu)先獲取高優(yōu)先級的url,如果沒有,再獲取低優(yōu)先級的url)**/ public interface IRepository { /** * 獲取url的方法 * 從倉庫中獲取url(優(yōu)先獲取高優(yōu)先級的url,如果沒有,再獲取低優(yōu)先級的url) * @return */ public String poll(); /** * 向高優(yōu)先級列表中添加商品列表url * @param highUrl */ public void offerHigher(String highUrl); /** * 向低優(yōu)先級列表中添加商品url * @param lowUrl */ public void offerLower(String lowUrl); }

其基于Redis作為URL倉庫的實現(xiàn)如下:

/*** 基于Redis的全網(wǎng)爬蟲,隨機獲取爬蟲url:** Redis中用來保存url的數(shù)據(jù)結(jié)構(gòu)如下:* 1.需要爬取的域名集合(存儲數(shù)據(jù)類型為set,這個需要先在Redis中添加)* key* spider.website.domains* value(set)* jd.com suning.com gome.com* key由常量對象SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY 獲得* 2.各個域名所對應(yīng)的高低優(yōu)先url隊列(存儲數(shù)據(jù)類型為list,這個由爬蟲程序解析種子url后動態(tài)添加)* key* jd.com.higher* jd.com.lower* suning.com.higher* suning.com.lower* gome.com.higher* gome.come.lower* value(list)* 相對應(yīng)需要解析的url列表* key由隨機的域名 + 常量 SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX或者SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX獲得* 3.種子url列表* key* spider.seed.urls* value(list)* 需要爬取的數(shù)據(jù)的種子url* key由常量SpiderConstants.SPIDER_SEED_URLS_KEY獲得** 種子url列表中的url會由url調(diào)度器定時向高低優(yōu)先url隊列中*/ public class RandomRedisRepositoryImpl implements IRepository { /** * 構(gòu)造方法 */ public RandomRedisRepositoryImpl() { init(); } /** * 初始化方法,初始化時,先將redis中存在的高低優(yōu)先級url隊列全部刪除 * 否則上一次url隊列中的url沒有消耗完時,再停止啟動跑下一次,就會導(dǎo)致url倉庫中有重復(fù)的url */ public void init() { Jedis jedis = JedisUtil.getJedis(); Set<String> domains = jedis.smembers(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); String higherUrlKey; String lowerUrlKey; for(String domain : domains) { higherUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; lowerUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; jedis.del(higherUrlKey, lowerUrlKey); } JedisUtil.returnJedis(jedis); } /** * 從隊列中獲取url,目前的策略是: * 1.先從高優(yōu)先級url隊列中獲取 * 2.再從低優(yōu)先級url隊列中獲取 * 對應(yīng)我們的實際場景,應(yīng)該是先解析完列表url再解析商品url * 但是需要注意的是,在分布式多線程的環(huán)境下,肯定是不能完全保證的,因為在某個時刻高優(yōu)先級url隊列中 * 的url消耗完了,但實際上程序還在解析下一個高優(yōu)先級url,此時,其它線程去獲取高優(yōu)先級隊列url肯定獲取不到 * 這時就會去獲取低優(yōu)先級隊列中的url,在實際考慮分析時,這點尤其需要注意 * @return */ @Override public String poll() { // 從set中隨機獲取一個頂級域名 Jedis jedis = JedisUtil.getJedis(); String randomDomain = jedis.srandmember(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); // jd.com String key = randomDomain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; // jd.com.higher String url = jedis.lpop(key); if(url == null) { // 如果為null,則從低優(yōu)先級中獲取 key = randomDomain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; // jd.com.lower url = jedis.lpop(key); } JedisUtil.returnJedis(jedis); return url; } /** * 向高優(yōu)先級url隊列中添加url * @param highUrl */ @Override public void offerHigher(String highUrl) { offerUrl(highUrl, SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX); } /** * 向低優(yōu)先url隊列中添加url * @param lowUrl */ @Override public void offerLower(String lowUrl) { offerUrl(lowUrl, SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX); } /** * 添加url的通用方法,通過offerHigher和offerLower抽象而來 * @param url 需要添加的url * @param urlTypeSuffix url類型后綴.higher或.lower */ public void offerUrl(String url, String urlTypeSuffix) { Jedis jedis = JedisUtil.getJedis(); String domain = SpiderUtil.getTopDomain(url); // 獲取url對應(yīng)的頂級域名,如jd.com String key = domain + urlTypeSuffix; // 拼接url隊列的key,如jd.com.higher jedis.lpush(key, url); // 向url隊列中添加url JedisUtil.returnJedis(jedis); } }

通過代碼分析也是可以知道,其核心就在如何調(diào)度url倉庫(Redis)中的url。

4.3 URL定時器

一段時間后,高優(yōu)先級URL隊列和低優(yōu)先URL隊列中的url都會被消費完,為了讓程序可以繼續(xù)爬取數(shù)據(jù),同時減少人為的干預(yù),可以預(yù)先在Redis中插入種子url,之后定時讓URL定時器從種子url中取出url定存放到高優(yōu)先級URL隊列中,以此達到程序定時不間斷爬取數(shù)據(jù)的目的。

url消費完畢后,是否需要循環(huán)不斷爬取數(shù)據(jù)根據(jù)個人業(yè)務(wù)需求而不同,因此這一步不是必需的,只是也提供了這樣的操作。因為事實上,我們需要爬取的數(shù)據(jù)也是每隔一段時間就會更新的,如果希望我們爬取的數(shù)據(jù)也跟著定時更新,那么這時定時器就有非常重要的作用了。不過需要注意的是,一旦決定需要循環(huán)重復(fù)爬取數(shù)據(jù),則在設(shè)計存儲器實現(xiàn)時需要考慮重復(fù)數(shù)據(jù)的問題,即重復(fù)數(shù)據(jù)應(yīng)該是更新操作,目前在我設(shè)計的存儲器不包括這個功能,有興趣的朋友可以自己實現(xiàn),只需要在插入數(shù)據(jù)前判斷數(shù)據(jù)庫中是否存在該數(shù)據(jù)即可。

另外需要注意的一點是,URL定時器是一個獨立的進程,需要單獨啟動。

定時器基于Quartz實現(xiàn),下面是其job的代碼:

/*** 每天定時從url倉庫中獲取種子url,添加進高優(yōu)先級列表*/ public class UrlJob implements Job { // log4j日志記錄 private Logger logger = LoggerFactory.getLogger(UrlJob.class); @Override public void execute(JobExecutionContext context) throws JobExecutionException { /** * 1.從指定url種子倉庫獲取種子url * 2.將種子url添加進高優(yōu)先級列表 */ Jedis jedis = JedisUtil.getJedis(); Set<String> seedUrls = jedis.smembers(SpiderConstants.SPIDER_SEED_URLS_KEY); // spider.seed.urls Redis數(shù)據(jù)類型為set,防止重復(fù)添加種子url for(String seedUrl : seedUrls) { String domain = SpiderUtil.getTopDomain(seedUrl); // 種子url的頂級域名 jedis.sadd(domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX, seedUrl); logger.info("獲取種子:{}", seedUrl); } JedisUtil.returnJedis(jedis); // System.out.println("Scheduler Job Test..."); } }

調(diào)度器的實現(xiàn)如下:

/*** url定時調(diào)度器,定時向url對應(yīng)倉庫中存放種子url** 業(yè)務(wù)規(guī)定:每天凌晨1點10分向倉庫中存放種子url*/ public class UrlJobScheduler { public UrlJobScheduler() { init(); } /** * 初始化調(diào)度器 */ public void init() { try { Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); // 如果沒有以下start方法的執(zhí)行,則是不會開啟任務(wù)的調(diào)度 scheduler.start(); String name = "URL_SCHEDULER_JOB"; String group = "URL_SCHEDULER_JOB_GROUP"; JobDetail jobDetail = new JobDetail(name, group, UrlJob.class); String cronExpression = "0 10 1 * * ?"; Trigger trigger = new CronTrigger(name, group, cronExpression); // 調(diào)度任務(wù) scheduler.scheduleJob(jobDetail, trigger); } catch (SchedulerException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } } public static void main(String[] args) { UrlJobScheduler urlJobScheduler = new UrlJobScheduler(); urlJobScheduler.start(); } /** * 定時調(diào)度任務(wù) * 因為我們每天要定時從指定的倉庫中獲取種子url,并存放到高優(yōu)先級的url列表中 * 所以是一個不間斷的程序,所以不能停止 */ private void start() { while (true) { } } }

5 監(jiān)控報警系統(tǒng)

監(jiān)控報警系統(tǒng)的加入主要是為了讓使用者可以主動發(fā)現(xiàn)節(jié)點宕機,而不是被動地發(fā)現(xiàn),因為實際中爬蟲程序可能是持續(xù)不斷運行的,并且我們會在多個節(jié)點上部署我們的爬蟲程序,因此很有必要對節(jié)點進行監(jiān)控,并且在節(jié)點出現(xiàn)問題時可以及時發(fā)現(xiàn)并修正,需要注意的是,監(jiān)控報警系統(tǒng)是一個獨立的進程,需要單獨啟動。

5.1 基本原理

首先需要先在zookeeper中創(chuàng)建一個/ispider節(jié)點:

[zk: localhost:2181(CONNECTED) 1] create /ispider ispider Created /ispider

監(jiān)控報警系統(tǒng)的開發(fā)主要依賴于zookeeper實現(xiàn),監(jiān)控程序?qū)ookeeper下面的這個節(jié)點目錄進行監(jiān)聽:

[zk: localhost:2181(CONNECTED) 0] ls /ispider []

爬蟲程序啟動時會在該節(jié)點目錄下注冊一個臨時節(jié)點目錄:

[zk: localhost:2181(CONNECTED) 0] ls /ispider [192.168.43.166]

當(dāng)節(jié)點出現(xiàn)宕機時,該臨時節(jié)點目錄就會被zookeeper刪除

[zk: localhost:2181(CONNECTED) 0] ls /ispider []

同時因為我們監(jiān)聽了節(jié)點目錄/ispider,所以當(dāng)zookeeper刪除其下的節(jié)點目錄時(或增加一個節(jié)點目錄),zookeeper會給我們的監(jiān)控程序發(fā)送通知,即我們的監(jiān)控程序會得到回調(diào),這樣便可以在回調(diào)程序中執(zhí)行報警的系統(tǒng)動作,從而完成監(jiān)控報警的功能。

5.2 zookeeper Java API使用說明

可以使用zookeeper原生的Java API,我在另外寫的一個RPC框架(底層基于Netty實現(xiàn)遠程通信)中就是使用原生的API,不過顯然代碼會復(fù)雜很多,并且本身需要對zookeeper有更多的學(xué)習(xí)和了解,這樣用起來才會容易一些。

所以為了降低開發(fā)的難度,這里使用第三方封裝的API,即curator,來進行zookeeper客戶端程序的開發(fā)。

5.3 爬蟲系統(tǒng)zookeeper注冊

在啟動爬蟲系統(tǒng)時,我們的程序都會啟動一個zookeeper客戶端來向zookeeper來注冊自身的節(jié)點信息,主要是ip地址,并在/ispider節(jié)點目錄以創(chuàng)建一個以該爬蟲程序所在的節(jié)點IP地址命名的節(jié)點,如/ispider/192.168.43.116,實現(xiàn)的代碼如下:

/*** 注冊zk*/ private void registerZK() { String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181"; int baseSleepTimeMs = 1000; int maxRetries = 3; RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); CuratorFramework curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy); curator.start(); String ip = null; try { // 向zk的具體目錄注冊 寫節(jié)點 創(chuàng)建節(jié)點 ip = InetAddress.getLocalHost().getHostAddress(); curator.create().withMode(CreateMode.EPHEMERAL).forPath("/ispider/" + ip, ip.getBytes()); } catch (UnknownHostException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } }

應(yīng)該注意到的是,我們創(chuàng)建的節(jié)點為臨時節(jié)點,要想實現(xiàn)監(jiān)控報警功能,必須要為臨時節(jié)點。

5.4 監(jiān)控程序

首先需要先監(jiān)聽zookeeper中的一個節(jié)點目錄,在我們的系統(tǒng)中,設(shè)計是監(jiān)聽/ispider這個節(jié)點目錄:

public SpiderMonitorTask() {String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181"; int baseSleepTimeMs = 1000; int maxRetries = 3; RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy); curator.start(); try { previousNodes = curator.getChildren().usingWatcher(this).forPath("/ispider"); } catch (Exception e) { e.printStackTrace(); } }

在上面注冊了zookeeper中的watcher,也就是接收通知的回調(diào)程序,在該程序中,執(zhí)行我們報警的邏輯:

/*** 這個方法,當(dāng)監(jiān)控的zk對應(yīng)的目錄一旦有變動,就會被調(diào)用* 得到當(dāng)前最新的節(jié)點狀態(tài),將最新的節(jié)點狀態(tài)和初始或者上一次的節(jié)點狀態(tài)作比較,那我們就知道了是由誰引起的節(jié)點變化* @param event*/ @Override public void process(WatchedEvent event) { try { List<String> currentNodes = curator.getChildren().usingWatcher(this).forPath("/ispider"); // HashSet<String> previousNodesSet = new HashSet<>(previousNodes); if(currentNodes.size() > previousNodes.size()) { // 最新的節(jié)點服務(wù),超過之前的節(jié)點服務(wù)個數(shù),有新的節(jié)點增加進來 for(String node : currentNodes) { if(!previousNodes.contains(node)) { // 當(dāng)前節(jié)點就是新增節(jié)點 logger.info("----有新的爬蟲節(jié)點{}新增進來", node); } } } else if(currentNodes.size() < previousNodes.size()) { // 有節(jié)點掛了 發(fā)送告警郵件或者短信 for(String node : previousNodes) { if(!currentNodes.contains(node)) { // 當(dāng)前節(jié)點掛掉了 得需要發(fā)郵件 logger.info("----有爬蟲節(jié)點{}掛掉了", node); MailUtil.sendMail("有爬蟲節(jié)點掛掉了,請人工查看爬蟲節(jié)點的情況,節(jié)點信息為:", node); } } } // 掛掉和新增的數(shù)目一模一樣,上面是不包括這種情況的,有興趣的朋友可以直接實現(xiàn)包括這種特殊情況的監(jiān)控 previousNodes = currentNodes; // 更新上一次的節(jié)點列表,成為最新的節(jié)點列表 } catch (Exception e) { e.printStackTrace(); } // 在原生的API需要再做一次監(jiān)控,因為每一次監(jiān)控只會生效一次,所以當(dāng)上面發(fā)現(xiàn)變化后,需要再監(jiān)聽一次,這樣下一次才能監(jiān)聽到 // 但是在使用curator的API時則不需要這樣做 }

當(dāng)然,判斷節(jié)點是否掛掉,上面的邏輯還是存在一定的問題的,按照上面的邏輯,假如某一時刻新增節(jié)點和刪除節(jié)點事件同時發(fā)生,那么其就不能判斷出來,所以如果需要更精準(zhǔn)的話,可以將上面的程序代碼修改一下。

5.5 郵件發(fā)送模塊

使用模板代碼就可以了,不過需要注意的是,在使用時,發(fā)件人的信息請使用自己的郵箱。

下面是爬蟲節(jié)點掛掉時接收到的郵件:

實際上,如果購買了短信服務(wù),那么通過短信API也可以向我們的手機發(fā)送短信。

6 實戰(zhàn):爬取京東、蘇寧易購全網(wǎng)手機商品數(shù)據(jù)

因為前面在介紹這個系統(tǒng)的時候也提到了,我只寫了京東和蘇寧易購的網(wǎng)頁解析器,所以接下來也就是爬取其全網(wǎng)的手機商品數(shù)據(jù)。

6.1 環(huán)境說明

需要確保Redis、Zookeeper服務(wù)可用,另外如果需要使用HBase來存儲數(shù)據(jù),需要確保Hadoop集群中的HBase可用,并且相關(guān)配置文件已經(jīng)加入到爬蟲程序的classpath中。

還有一點需要注意的是,URL定時器和監(jiān)控報警系統(tǒng)是作為單獨的進程來運行的,并且也是可選的。

6.2 爬蟲結(jié)果

進行了兩次爬取,分別嘗試將數(shù)據(jù)保存到MySQL和HBase中,給出如下數(shù)據(jù)情況。

6.2.1 保存到MySQL

mysql> select count(*) from phone; +----------+ | count(*) | +----------+ | 12052 | +----------+ 1 row in setmysql> select count(*) from phone where source='jd.com'; +----------+ | count(*) | +----------+ | 9578 | +----------+ 1 row in setmysql> select count(*) from phone where source='suning .com'; +----------+ | count(*) | +----------+ | 2474 | +----------+ 1 row in set

在可視化工具中查看數(shù)據(jù)情況:

6.2.2 保存到HBase

hbase(main):225:0* count 'phone' Current count: 1000, row: 11155386088_jd.com Current count: 2000, row: 136191393_suning.com Current count: 3000, row: 16893837301_jd.com Current count: 4000, row: 19036619855_jd.com Current count: 5000, row: 1983786945_jd.com Current count: 6000, row: 1997392141_jd.com Current count: 7000, row: 21798495372_jd.com Current count: 8000, row: 24154264902_jd.com Current count: 9000, row: 25687565618_jd.com Current count: 10000, row: 26458674797_jd.com Current count: 11000, row: 617169906_suning.com Current count: 12000, row: 769705049_suning.com 12348 row(s) in 1.5720 seconds => 12348

在HDFS中查看數(shù)據(jù)情況:

6.2.3 數(shù)據(jù)量與實際情況分析

  • 京東

京東手機的列表大概有160多頁,每個列表有60個商品數(shù)據(jù),所以總量在9600左右,我們的數(shù)據(jù)基本是符合的,后面通過日志分析其實可以知道,一般丟失的數(shù)據(jù)為連接超時導(dǎo)致的,所以在選取爬蟲的環(huán)境時,更建議在網(wǎng)絡(luò)環(huán)境好的主機上進行,同時如果可以有IP代理地址庫就更好了,另外對于連接超時的情況,其實是可以進一步在我們的程序中加以控制,一旦出現(xiàn)爬取數(shù)據(jù)失敗的url,可以將其加入到重試url隊列中,目前這一點功能我是沒有做,有興趣的同學(xué)可以試一下。

  • 蘇寧易購

再來看看蘇寧的,其有100頁左右的手機列表,每頁也是60個商品數(shù)據(jù),所以總量在6000左右。但可以看到,我們的數(shù)據(jù)卻只有3000這樣的數(shù)量級(缺少的依然是頻繁爬取造成的連接失敗問題),這是為什么呢?

這是因為,打開蘇寧的某個列表頁面后,其是先加載30個商品,當(dāng)鼠標(biāo)向下滑動時,才會通過另外的API去加載其它的30個商品數(shù)據(jù),每一個列表頁面都是如此,所以,實際上,我們是缺少了一半的商品數(shù)據(jù)沒有爬取。知道這個原因之后,實現(xiàn)也不難,但是因為時間關(guān)系,我就沒有做了,有興趣的朋友折騰一下吧。

6.3 通過日志分析爬蟲系統(tǒng)的性能

在我們的爬蟲系統(tǒng)中,每個關(guān)鍵的地方,如網(wǎng)頁下載、數(shù)據(jù)解析等都是有打logger的,所以通過日志,可以大概分析出相關(guān)的時間參數(shù)。

2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網(wǎng)頁:https://list.jd.com/list.html?cat=9987,653,655&page=1,消耗時長:590 ms,代理信息:null:null 2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO] - 解析列表頁面:https://list.jd.com/list.html?cat=9987,653,655&page=1, 消耗時長:46ms 2018-04-01 21:26:03 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析列表頁面:https://list.suning.com/0-20006-0.html, 消耗時長:49ms 2018-04-01 21:26:04 [pool-1-thread-5] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網(wǎng)頁:https://item.jd.com/6737464.html,消耗時長:219 ms,代理信息:null:null 2018-04-01 21:26:04 [pool-1-thread-2] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網(wǎng)頁:https://list.jd.com/list.html?cat=9987,653,655&page=2&sort=sort_rank_asc&trans=1&JL=6_0_0,消耗時長:276 ms,代理信息:null:null 2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網(wǎng)頁:https://list.suning.com/0-20006-99.html,消耗時長:300 ms,代理信息:null:null 2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析列表頁面:https://list.suning.com/0-20006-99.html, 消耗時長:4ms ...... 2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網(wǎng)頁:https://club.jd.com/comment/productCommentSummaries.action?referenceIds=23934388891,消耗時長:176 ms,代理信息:null:null 2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO] - 解析商品頁面:https://item.jd.com/23934388891.html, 消耗時長:413ms 2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網(wǎng)頁:https://review.suning.com/ajax/review_satisfy/general-00000000010017793337-0070079092-----satisfy.htm,消耗時長:308 ms,代理信息:null:null 2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析商品頁面:https://product.suning.com/0070079092/10017793337.html, 消耗時長:588ms ......

平均下來,下載一個商品網(wǎng)頁數(shù)據(jù)的時間在200~500毫秒不等,當(dāng)然這個還需要取決于當(dāng)時的網(wǎng)絡(luò)情況。

另外,如果想要真正計算爬取一個商品的數(shù)據(jù),可以通過日志下面的數(shù)據(jù)來計算:

  • 下載一個商品頁面數(shù)據(jù)的時間
  • 獲取價格數(shù)據(jù)的時間
  • 獲取評論數(shù)據(jù)的時間

在我的主機上(CPU:E5 10核心,內(nèi)存:32GB,分別開啟1個虛擬機和3個虛擬機),情況如下:

節(jié)點數(shù)每節(jié)點線程數(shù)商品數(shù)量時間
15京東+蘇寧易購近13000個商品數(shù)據(jù)141分鐘
35京東+蘇寧易購近13000個商品數(shù)據(jù)65分鐘

可以看到,當(dāng)使用3個節(jié)點時,時間并不會相應(yīng)地縮小為原來的1/3,這是因為此時影響爬蟲性能的問題主要是網(wǎng)絡(luò)問題,節(jié)點數(shù)量多,線程數(shù)量大,網(wǎng)絡(luò)請求也多,但是帶寬一定,并且在沒有使用代理的情況,請求頻繁,連接失敗的情況也會增多,對時間也有一定的影響,如果使用隨機代理庫,情況將會好很多。

但可以肯定的是,在橫向擴展增加爬蟲節(jié)點之后,確實可以大大縮小我們的爬蟲時間,這也是分布式爬蟲系統(tǒng)的好處。

7 爬蟲系統(tǒng)中使用的反反爬蟲策略

在整個爬蟲系統(tǒng)的設(shè)計中,主要使用下面的策略來達到反反爬蟲的目的:

  • 使用代理來訪問-->IP代理庫,隨機IP代理
  • 隨機頂級域名url訪問-->url調(diào)度系統(tǒng)
  • 每個線程每爬取完一條商品數(shù)據(jù)sleep一小段時間再進行爬取

8 總結(jié)

需要說明的是,本系統(tǒng)是基于Java實現(xiàn)的,但個人覺得,語言本身依然不是問題,核心在于對整個系統(tǒng)的設(shè)計上以及理解上,寫此文章是希望分享這樣一種分布式爬蟲系統(tǒng)的架構(gòu)給大家,如果對源代碼感興趣,可以到我的GitHub上查看。

GitHub:https://github.com/xpleaf/ispider

轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/8820037.html

總結(jié)

以上是生活随笔為你收集整理的分布式爬虫系统设计、实现与实战:爬取京东、苏宁易购全网手机商品数据+MySQL、HBase存储...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。