2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
目錄
案例-SogouQ日志分析
業務需求
準備工作
HanLP?中文分詞
樣例類 SogouRecord
業務實現
???????搜索關鍵詞統計
???????用戶搜索點擊統計
???????搜索時間段統計
???????完整代碼
案例-SogouQ日志分析
使用搜狗實驗室提供【用戶查詢日志(SogouQ)】數據,使用Spark框架,將數據封裝到RDD中進行業務數據處理分析。數據網址:http://www.sogou.com/labs/resource/q.php
?1)、數據介紹:搜索引擎查詢日志庫設計為包括約1個月(2008年6月)Sogou搜索引擎部分網頁查詢需求及用戶點擊情況的網頁查詢日志數據集合。
?2)、數據格式
訪問時間\t用戶ID\t[查詢詞]\t該URL在返回結果中的排名\t用戶點擊的順序號\t用戶點擊的URL
?
?
?
?
用戶ID是根據用戶使用瀏覽器訪問搜索引擎時的Cookie信息自動賦值,即同一次使用瀏覽器輸入的不同查詢對應同一個用戶ID
?3)、數據下載:分為三個數據集,大小不一樣
迷你版(樣例數據, 376KB):http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.mini.zip
精簡版(1天數據,63MB):http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.reduced.zip
完整版(1.9GB):http://www.sogou.com/labs/resource/ftp.php?dir=/Data/SogouQ/SogouQ.zip
?
業務需求
針對SougoQ用戶查詢日志數據中不同字段,不同業務進行統計分析:
?
使用SparkContext讀取日志數據,封裝到RDD數據集中,調用Transformation函數和Action函數處理分析,靈活掌握Scala語言編程。
?
準備工作
?????在編程實現業務功能之前,首先考慮如何對【查詢詞】進行中文分詞及將日志數據解析封裝。
HanLP?中文分詞
????使用比較流行好用中文分詞:HanLP,面向生產環境的自然語言處理工具包,HanLP 是由一系列模型與算法組成的 Java 工具包,目標是普及自然語言處理在生產環境中的應用。
官方網站:http://www.hanlp.com/,添加Maven依賴
<dependency><groupId>com.hankcs</groupId><artifactId>hanlp</artifactId><version>portable-1.7.7</version></dependency>
演示范例:HanLP 入門案例,基本使用
package cn.itcast.coreimport java.utilimport com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import com.hankcs.hanlp.tokenizer.StandardTokenizerimport scala.collection.JavaConverters._/*** HanLP 入門案例,基本使用*/
object HanLPTest {def main(args: Array[String]): Unit = {// 入門Demoval terms: util.List[Term] = HanLP.segment("杰克奧特曼全集視頻")println(terms)println(terms.asScala.map(_.word.trim))// 標準分詞val terms1: util.List[Term] = StandardTokenizer.segment("放假++端午++重陽")println(terms1)println(terms1.asScala.map(_.word.replaceAll("\\s+", "")))val words: Array[String] ="""00:00:00 2982199073774412 ???[360安全衛士] ??8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html""".split("\\s+")println(words(2).replaceAll("\\[|\\]", ""))//將"["和"]"替換為空""}}
?
???????樣例類 SogouRecord
將每行日志數據封裝到CaseClass樣例類SogouRecord中,方便后續處理:
/*** 用戶搜索點擊網頁記錄Record* @param queryTime ?訪問時間,格式為:HH:mm:ss* @param userId ????用戶ID* @param queryWords 查詢詞* @param resultRank 該URL在返回結果中的排名* @param clickRank ?用戶點擊的順序號* @param clickUrl ??用戶點擊的URL*/
case class SogouRecord(queryTime: String, userId: String, queryWords: String, resultRank: Int, clickRank: Int, clickUrl: String )
?
???????業務實現
先讀取數據,封裝到SougoRecord類中,再按照業務處理數據。
最后也可以將分析的結果存儲到MySQL表中。
???????讀取數據
?????構建SparkContext實例對象,讀取本次SogouQ.sample數據,封裝到SougoRecord中 。
object SogouQueryAnalysis {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// TODO: 1. 本地讀取SogouQ用戶查詢日志數據val rawLogsRDD: RDD[String] = sc.textFile("data/input/SogouQ.sample")//println(s"Count = ${rawLogsRDD.count()}")// TODO: 2. 解析數據,封裝到CaseClass樣例類中val recordsRDD: RDD[SogouRecord] = rawLogsRDD// 過濾不合法數據,如null,分割后長度不等于6.filter(log => log != null && log.trim.split("\\s+").length == 6)// 對每個分區中數據進行解析,封裝到SogouRecord.mapPartitions(iter => {iter.map(log => {val arr: Array[String] = log.trim.split("\\s+")SogouRecord(arr(0),arr(1),arr(2).replaceAll("\\[|\\]", ""),arr(3).toInt,arr(4).toInt,arr(5))})})println(s"Count = ${recordsRDD.count()},\nFirst = ${recordsRDD.first()}")// 數據使用多次,進行緩存操作,使用count觸發recordsRDD.persist(StorageLevel.MEMORY_AND_DISK).count()
?
???????搜索關鍵詞統計
獲取用戶【查詢詞】,使用HanLP進行分詞,按照單詞分組聚合統計出現次數,類似WordCount程序,具體代碼如下:
// =================== 3.1 搜索關鍵詞統計?===================
// a. 獲取搜索詞,進行中文分詞
val wordsRDD: RDD[String] = recordsRDD.mapPartitions(iter => {iter.flatMap(record => {// 使用HanLP中文分詞庫進行分詞val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)// 將Java中集合對轉換為Scala中集合對象import scala.collection.JavaConverters._terms.asScala.map(_.word)})
})
println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}")// b. 統計搜索詞出現次數,獲取次數最多Top10
val top10SearchWords: Array[(Int, String)] = wordsRDD.map((_, 1)) // 每個單詞出現一次.reduceByKey(_ + _) // 分組統計次數.map(_.swap).sortByKey(ascending = false) // 詞頻降序排序.take(10) // 獲取前10個搜索詞
top10SearchWords.foreach(println)
運行結果如下:
?
?
???????用戶搜索點擊統計
統計出每個用戶每個搜索詞點擊網頁的次數,可以作為搜索引擎搜索效果評價指標。先按照用戶ID分組,再按照【查詢詞】分組,最后統計次數,求取最大次數、最小次數及平均次數。
// =================== 3.2 用戶搜索點擊次數統計?===================
/*每個用戶在搜索引擎輸入關鍵詞以后,統計點擊網頁數目,反應搜索引擎準確度先按照用戶ID分組,再按照搜索詞分組,統計出每個用戶每個搜索詞點擊網頁個數*/
val clickCountRDD: RDD[((String, String), Int)] = recordsRDD.map(record => {// 獲取用戶ID和搜索詞val key = (record.userId, record.queryWords)(key, 1)})// 按照用戶ID和搜索詞組合的Key分組聚合.reduceByKey(_ + _)clickCountRDD.sortBy(_._2, ascending = false).take(10).foreach(println)println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}")
println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}")
println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")
程序運行結果如下:
?
???????搜索時間段統計
按照【訪問時間】字段獲取【小時:分鐘】,分組統計各個小時段用戶查詢搜索的數量,進一步觀察用戶喜歡在哪些時間段上網,使用搜狗引擎搜索,代碼如下:
// =================== 3.3 搜索時間段統計?===================
/*從搜索時間字段獲取小時,統計個小時搜索次數*/
val hourSearchRDD: RDD[(String, Int)] = recordsRDD// 提取小時和分鐘.map(record => {// 03:12:50record.queryTime.substring(0, 5)})// 分組聚合.map((_, 1)) // 每個單詞出現一次.reduceByKey(_ + _) // 分組統計次數.sortBy(_._2, ascending = false)
hourSearchRDD.foreach(println)
??程序運行結果如下:
?
???????完整代碼
業務實現完整代碼SogouQueryAnalysis如下所示:
package cn.itcast.coreimport java.utilimport com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}/*** 用戶查詢日志(SogouQ)分析,數據來源Sogou搜索引擎部分網頁查詢需求及用戶點擊情況的網頁查詢日志數據集合。* ????1. 搜索關鍵詞統計,使用HanLP中文分詞* ????2. 用戶搜索次數統計* ????3. 搜索時間段統計* 數據格式:* 訪問時間\t用戶ID\t[查詢詞]\t該URL在返回結果中的排名\t用戶點擊的順序號\t用戶點擊的URL* 其中,用戶ID是根據用戶使用瀏覽器訪問搜索引擎時的Cookie信息自動賦值,即同一次使用瀏覽器輸入的不同查詢對應同一個用戶ID*/
object SogouQueryAnalysis {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// TODO: 1. 本地讀取SogouQ用戶查詢日志數據val rawLogsRDD: RDD[String] = sc.textFile("data/input/SogouQ.sample")//val rawLogsRDD: RDD[String] = sc.textFile("D:/data/sogou/SogouQ.reduced")//println(s"Count = ${rawLogsRDD.count()}")// TODO: 2. 解析數據,封裝到CaseClass樣例類中val recordsRDD: RDD[SogouRecord] = rawLogsRDD// 過濾不合法數據,如null,分割后長度不等于6.filter(log => log != null && log.trim.split("\\s+").length == 6)// 對每個分區中數據進行解析,封裝到SogouRecord.mapPartitions(iter => {iter.map(log => {val arr: Array[String] = log.trim.split("\\s+")SogouRecord(arr(0),arr(1),arr(2).replaceAll("\\[|\\]", ""),arr(3).toInt,arr(4).toInt,arr(5))})})println("====解析數據===")println(s"Count = ${recordsRDD.count()},\nFirst = ${recordsRDD.first()}")// 數據使用多次,進行緩存操作,使用count觸發recordsRDD.persist(StorageLevel.MEMORY_AND_DISK).count()// TODO: 3. 依據需求統計分析/*1. 搜索關鍵詞統計,使用HanLP中文分詞2. 用戶搜索次數統計3. 搜索時間段統計*/println("====3.1 搜索關鍵詞統計===")// =================== 3.1 搜索關鍵詞統計?===================// a. 獲取搜索詞,進行中文分詞val wordsRDD: RDD[String] = recordsRDD.mapPartitions(iter => {iter.flatMap(record => {// 使用HanLP中文分詞庫進行分詞val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)// 將Java中集合對轉換為Scala中集合對象import scala.collection.JavaConverters._terms.asScala.map(_.word)})})//println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}")// b. 統計搜索詞出現次數,獲取次數最多Top10val top10SearchWords: Array[(Int, String)] = wordsRDD.map((_, 1)) // 每個單詞出現一次.reduceByKey(_ + _) // 分組統計次數.map(_.swap).sortByKey(ascending = false) // 詞頻降序排序.take(10) // 獲取前10個搜索詞top10SearchWords.foreach(println)println("====3.2 用戶搜索點擊次數統計===")// =================== 3.2 用戶搜索點擊次數統計?===================/*每個用戶在搜索引擎輸入關鍵詞以后,統計點擊網頁數目,反應搜索引擎準確度先按照用戶ID分組,再按照搜索詞分組,統計出每個用戶每個搜索詞點擊網頁個數*/val clickCountRDD: RDD[((String, String), Int)] = recordsRDD.map(record => {// 獲取用戶ID和搜索詞val key = (record.userId, record.queryWords)(key, 1)})// 按照用戶ID和搜索詞組合的Key分組聚合.reduceByKey(_ + _)clickCountRDD.sortBy(_._2, ascending = false).take(10).foreach(println)println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}")println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}")println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")println("====3.3 搜索時間段統計===")// =================== 3.3 搜索時間段統計?===================/*從搜索時間字段獲取小時,統計個小時搜索次數*/val hourSearchRDD: RDD[(String, Int)] = recordsRDD// 提取小時和分鐘.map(record => {// 03:12:50record.queryTime.substring(0, 5)})// 分組聚合.map((_, 1)) // 每個單詞出現一次.reduceByKey(_ + _) // 分組統計次數.sortBy(_._2, ascending = false)hourSearchRDD.foreach(println)// 釋放緩存數據recordsRDD.unpersist()// 應用結束,關閉資源sc.stop()}/*** 用戶搜索點擊網頁記錄Record** @param queryTime ?訪問時間,格式為:HH:mm:ss* @param userId ????用戶ID* @param queryWords 查詢詞* @param resultRank 該URL在返回結果中的排名* @param clickRank ?用戶點擊的順序號* @param clickUrl ??用戶點擊的URL*/case class SogouRecord(queryTime: String,userId: String,queryWords: String,resultRank: Int,clickRank: Int,clickUrl: String)}
???????
總結
以上是生活随笔為你收集整理的2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(二十):Sp
- 下一篇: 2021年大数据Spark(二十二):内