Spark系列十七:经典案列使用直连的方式,Kafka,SparkSteaming,Redis
先一個一個java程序,讀取日志文件中的數據,然后將數據寫入到Kafka中,然后寫一個SparkSteaming程序,使用直連的方式讀取Kafka中的數據,計算如下指標
該文件是一個電商網站某一天用戶購買商品的訂單成交數據,每一行有多個字段,用空格分割,字段的含義如下
用戶ID ? ip地址 ? ? ? ? ?商品分類 ? 購買明細 ? ? 商品金額
A ? ? ? ?202.106.196.115 手機 ? ? ? iPhone8 ? ? ?8000
0? ? ? ? ? 1? ? ? ? ? ? ? ? ? ?2? ? ? ? 3? ? ? ? ? 4
A 202.106.196.115 手機 iPhone8 8000
B 202.106.0.20 服裝 布萊奧尼西服 199
C 202.102.152.3 家具 嬰兒床 2000
D 202.96.96.68 家電 電飯鍋 1000
F 202.98.0.68 化妝品 迪奧香水 200
H 202.96.75.68 食品 奶粉 600
J 202.97.229.133 圖書 Hadoop編程指南 90
A 202.106.196.115 手機 手機殼 200
B 202.106.0.20 手機 iPhone8 8000
C 202.102.152.3 家具 嬰兒車 2000
D 202.96.96.68 家具 嬰兒車 1000
F 202.98.0.68 化妝品 迪奧香水 200
H 202.96.75.68 食品 嬰兒床 600
J 202.97.229.133 圖書 spark實戰 80
問題1.計算出各個省的成交量總額(結果保存到MySQL中)
問題2.計算每個省城市成交量的top3(結果保存到MySQL中)
問題3.計算每個商品分類的成交總額,并按照從高到低排序(結果保存到MySQL中)
問題4.構建每一個用戶的用戶畫像,就是根據用戶購買的具體商品,給用戶打上一個標簽,為將來的商品推薦系統作數據支撐
說明:如果一個用戶購買了一個iPhone8,對應有多個標簽:果粉、高端人士、數碼一族
請將下面的規則數據保存到MySQL數據庫中,并作為標簽規則(三個字段分別代表id、商品、對于的標簽):
1 iPhone8 果粉
2 iPhone8 高端人士
3 iPhone8 數碼一族
4 布萊奧尼西服 高端人士
5 布萊奧尼西服 商務男士
6 嬰兒床 育兒中
7 迪奧香水 高端人士
8 迪奧香水 白富美
9 嬰兒床 育兒中
10 iPhone8手機殼 果粉
11 iPhone8手機殼 高端人士
12 iPhone8手機殼 數碼一族
13 spark實戰 IT人士
14 spark實戰 屌絲
15 Hadoop編程指南 IT人士
16 Hadoop編程指南 屌絲
用戶的行為數據,根據規則打上對應的標簽,然后將數據存儲到Hbase中,并說明Hbase的注解和列族的設計思想!
由于用戶的行為過多,計算過程要對數據進行序列化的壓縮,要求使用kryo這種序列化機制,壓縮方式自己選擇
?
?
object OrderCount {def main(args: Array[String]): Unit = {val group = "g1"val conf = new SparkConf().setAppName("OrderCount").setMaster("local[4]")val ssc = new StreamingContext(conf, Duration(5000))val broadcastRef = IPUtils.broadcastIpRules(ssc, "/ip/ip.txt")val topic = "orders"val brokerList = "lj01:9092,lj02:9092,lj03:9092"val zkQuorum = "lj01:2181,lj02:2181,lj03:2181"val topics: Set[String] = Set(topic)val topicDirs = new ZKGroupTopicDirs(group, topic)val zkTopicPath = s"${topicDirs.consumerOffsetDir}"val kafkaParams = Map("metadata.broker.list" -> brokerList,"group.id" -> group,"auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString)val zkClient = new ZkClient(zkQuorum)val children = zkClient.countChildren(zkTopicPath)var kafkaStream: InputDStream[(String, String)] = nullvar fromOffsets: Map[TopicAndPartition, Long] = Map()//如果保存過 offset//注意:偏移量的查詢是在Driver完成的if (children > 0) {for (i <- 0 until children) {val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}")val tp = TopicAndPartition(topic, i)fromOffsets += (tp -> partitionOffset.toLong)}val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)} else {kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)}var offsetRanges = Array[OffsetRange]()//kafkaStream.foreachRDD里面的業務邏輯是在Driver端執行,RDD在Driver端生成,RDD調算子,算子里得函數的執行是在ExecutorkafkaStream.foreachRDD { kafkaRDD =>if(!kafkaRDD.isEmpty()) {offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRangesval lines: RDD[String] = kafkaRDD.map(_._2)//整理數據val fields: RDD[Array[String]] = lines.map(_.split(" "))//計算成交總金額CalculateUtil.calculateIncome(fields)//計算商品分類金額CalculateUtil.calculateItem(fields)//計算區域成交金額CalculateUtil.calculateZone(fields, broadcastRef)for (o <- offsetRanges) {val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)}}}ssc.start()ssc.awaitTermination()} }實現要在redis設置一個key,變量
object Constant {val TOTAL_INCOME = "TOTAL_INCOME" }?工具類CalculateUtil?
object CalculateUtil {//計算成交總金額def calculateIncome(fields: RDD[Array[String]]) = {//將數據計算后寫入到Reidsval priceRDD: RDD[Double] = fields.map(arr => {val price = arr(4).toDoubleprice})//reduce是一個Action,會把結果返回到Driver端//將當前批次的總金額返回了val sum: Double = priceRDD.reduce(_+_)//獲取一個jedis連接val conn = JedisConnectionPool.getConnection()//將歷史值和當前的值進行累加conn.incrByFloat(Constant.TOTAL_INCOME, sum)//釋放連接conn.close()}// 計算分類的成交金額def calculateItem(fields: RDD[Array[String]]) = {val itemAndPrice: RDD[(String, Double)] = fields.map(arr => {//分類val item = arr(2)//金額val parice = arr(4).toDouble(item, parice)})//按商品分類進行聚合val reduced: RDD[(String, Double)] = itemAndPrice.reduceByKey(_+_)//將當前批次的數據累加到Redis中//foreachPartition是一個Action//現在這種方式,jeids的連接是在哪一端創建的(Driver)//在Driver端拿Jedis連接不好reduced.foreachPartition(part => {//獲取一個Jedis連接//這個連接其實是在Executor中的獲取的//JedisConnectionPool在一個Executor進程中有幾個實例(單例)val conn = JedisConnectionPool.getConnection()part.foreach(t => {//一個連接更新多條數據conn.incrByFloat(t._1, t._2)})//將當前分區中的數據跟新完在關閉連接conn.close()})}//根據Ip計算歸屬地def calculateZone(fields: RDD[Array[String]], broadcastRef: Broadcast[Array[(Long, Long, String)]]) = {val provinceAndPrice: RDD[(String, Double)] = fields.map(arr => {val ip = arr(1)val price = arr(4).toDoubleval ipNum = MyUtils.ip2Long(ip)//在Executor中獲取到廣播的全部規則val allRules: Array[(Long, Long, String)] = broadcastRef.value//二分法查找val index = MyUtils.binarySearch(allRules, ipNum)var province = "未知"if (index != -1) {province = allRules(index)._3}//省份,訂單金額(province, price)})//按省份進行聚合val reduced: RDD[(String, Double)] = provinceAndPrice.reduceByKey(_+_)//將數據跟新到Redisreduced.foreachPartition(part => {val conn = JedisConnectionPool.getConnection()part.foreach(t => {conn.incrByFloat(t._1, t._2)})conn.close()})} }即Myutilt
object MyUtils {def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length){ipNum = fragments(i).toLong | ipNum << 8L}ipNum}def readRules(path: String): Array[(Long, Long, String)] = {//讀取ip規則val bf: BufferedSource = Source.fromFile(path)val lines: Iterator[String] = bf.getLines()//對ip規則進行整理,并放入到內存val rules: Array[(Long, Long, String)] = lines.map(line => {val fileds = line.split("[|]")val startNum = fileds(2).toLongval endNum = fileds(3).toLongval province = fileds(6)(startNum, endNum, province)}).toArrayrules}def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = {var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))return middleif (ip < lines(middle)._1)high = middle - 1else {low = middle + 1}}-1}def data2MySQL(it: Iterator[(String, Int)]): Unit = {//一個迭代器代表一個分區,分區中有多條數據//先獲得一個JDBC連接val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123568")//將數據通過Connection寫入到數據庫val pstm: PreparedStatement = conn.prepareStatement("INSERT INTO access_log VALUES (?, ?)")//將分區中的數據一條一條寫入到MySQL中it.foreach(tp => {pstm.setString(1, tp._1)pstm.setInt(2, tp._2)pstm.executeUpdate()})//將分區中的數據全部寫完之后,在關閉連接if(pstm != null) {pstm.close()}if (conn != null) {conn.close()}}def main(args: Array[String]): Unit = {//數據是在內存中val rules: Array[(Long, Long, String)] = readRules("/Users/zx/Desktop/ip/ip.txt")//將ip地址轉換成十進制val ipNum = ip2Long("114.215.43.42")//查找val index = binarySearch(rules, ipNum)//根據腳本到rules中查找對應的數據val tp = rules(index)val province = tp._3println(province)} }?
總結
以上是生活随笔為你收集整理的Spark系列十七:经典案列使用直连的方式,Kafka,SparkSteaming,Redis的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 原创超简单代码(1.19)
- 下一篇: spark streamming + k