日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

8. spark学习之旅(二)

發布時間:2023/12/8 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 8. spark学习之旅(二) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 1. 彈性分布式數據集RDD
    • 1.1. 什么是RDD
    • 1.2. RDD的屬性
    • 1.3. 通過RDD的轉換方式對RDD詳細解釋
    • 1.4. 如何創建RDD
    • 1.5. Transformation和Action詳解
    • 1.6. 常用的算子詳細解釋(==一天搞懂一個算子==)
    • 1.7. RDD的依賴關系和Stage劃分
    • 1.8 集群運行原理
    • 1.9. 緩存Cache設置和CheckPoint設置
  • 2. RDD小案例
    • 2.1. 快速輸出每個分區中的數據
    • 2.2. 服務器訪問日志根據ip地址查詢區域
  • 3. [三種任務提交流程standalone、yarn-cluster、yarn-client](https://www.cnblogs.com/lillcol/p/11159114.html)
    • 3.1. standalone模式
    • 3.2. Spark on Yarn
  • 4. 海闊憑魚躍,天高任鳥飛

1. 彈性分布式數據集RDD

1.1. 什么是RDD

  • RDD(Resilient Distributed Dataset)叫做分布式數據集,包含了只讀的、分區的、分布式計算的概念。
  • RDD是個類。
  • 1.2. RDD的屬性

  • 一個數據分區的列表(hdfs的所有數據塊的位置信息,保存在RDD類成員變量Array中)
    protected def getPartitions: Array[Partition]
  • 保存了在數據塊上的計算方法,這個計算方法會應用到每一個數據塊上,Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不需要保存每次計算的結果。
  • RDD之間的依賴關系。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關系。在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。
  • 一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基于哈希的HashPartitioner,另外一個是基于范圍RangePartitioner。只有對于于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
  • 一個列表,存儲存取每個Partition的優先位置(preferred location)。對于一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的存儲位置。
  • 1.3. 通過RDD的轉換方式對RDD詳細解釋

    1.4. 如何創建RDD

    • 通過序列化集合的方式創建RDD(spark中makerdd和parallelize的區別)

      sc.parallelize(1 to 9, 2)
    • 由外部存儲系統的數據集創建(textFile),包括本地的文件系統,還有所有Hadoop支持的數據集,比如HDFS、Cassandra、HBase等

      val rdd2 = sc.textFile("hdfs://mini1:9000/a.txt")
    • 通過其他的RDD做transformation操作轉換成新的RDD

    1.5. Transformation和Action詳解

    • RDD中的所有轉換都是延遲加載的,也就是說,它們并不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。

    • 官網鏈接2.4.1版本

    • 博客鏈接

    1.6. 常用的算子詳細解釋(一天搞懂一個算子)

    • Transformation算子(懶加載)

    • Action算子(觸發任務的進行)

    1.7. RDD的依賴關系和Stage劃分

    • 寬依賴: 依賴的RDD產生的數據不只是給我的,父RDD不只包含一個子RDD的數據。
    • 窄依賴:依賴的RDD產生的數據只給我自己。父RDD只包含一個子RDD的數據。
    • 血統(Lineage):RDD只支持粗粒度轉換,即在大量記錄上執行的單個操作。將創建RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分區。RDD的Lineage會記錄RDD的元數據信息和轉換行為,當該RDD的部分分區數據丟失時,它可以根據這些信息來重新運算和恢復丟失的數據分區。
    • Stage劃分

    1.8 集群運行原理

    1.9. 緩存Cache設置和CheckPoint設置

    • 如何設置cache和checkpoint
      • cache:
        someRdd.cache(): 緩存到內存
        someRdd.persist(StorageLevel.MEMORY_AND_DISK): 根據自己的需要設置緩存的位置(內存和硬盤)
      • CheckPoint:可以把RDD計算后的數據緩存在本地磁盤,也可以是hdfs
        sc.setCheckpointDir(“hdfs://mini1:9000/checkpoint”)
        someRdd.checkpoint()
    • 什么時候設置cache,什么時候設置checkpoint
      • 通常:遇到寬依賴設置checkpoint,窄依賴想緩存的話設置cache
    • cache 和checkpoint的區別
      • cache只是緩存數據,不改變RDD的依賴關系
      • checkpoint是生成了一個新的RDD,后面的RDD依賴關系已經改變
      • RDD發生異常尋找數據的過程:checkpoint --> cache --> 重算

    2. RDD小案例

    2.1. 快速輸出每個分區中的數據

    val a = sc.parallelize(1 to 9, 3) a.glom.collect

    2.2. 服務器訪問日志根據ip地址查詢區域

    • 需求分析
      • 在互聯網中,我們經常會見到城市熱點圖這樣的報表數據,例如百度統計中,會統計今年的熱門旅游城市、熱門報考學校等,會將這樣的信息顯示在熱點圖中。因此,我們需要通過日志信息(運營商或者網站自己生成)和城市ip段信息來判斷用戶的ip段,統計熱點經緯度。
    • 數據下載 我的百度云提取碼;3rzp
      • 城市ip段信息
      • ip日志信息
    • 源代碼
      • gradle文件配置

        plugins {id 'java'id 'scala' }version '1.0.0'sourceCompatibility = 1.8 targetCompatibility = 1.8sourceSets {main {scala {srcDirs = ['src/main/scala', 'src/main/java']}java {srcDirs = []}}test {scala {srcDirs = ['src/test/scala', 'src/test/java']}java {srcDirs = []}} }repositories {mavenCentral()maven {url 'http://maven.aliyun.com/nexus/content/groups/public/'}maven {url 'https://maven.ibiblio.org/maven2/'} }dependencies {compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.12' //scala基本庫compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.3.2'compile group: 'mysql', name: 'mysql-connector-java', version: '8.0.19' }jar {manifest {attributes 'Main-Class': 'com.xiaofan.ip_location.IPLocation'} }
      • scala源碼邏輯

        package com.xiaofan.ip_locationimport java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** @author xiaofan*/ object IPLocation {val data2Mysql = (iterator: Iterator[(String, Int)]) => {var conn: Connection = nullvar ps: PreparedStatement = nullval sql = "INSERT INTO location_info (location, counts, access_date) VALUES (?, ?, ?)"try {conn = DriverManager.getConnection("jdbc:mysql://mini1:3306/db_ip_location?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8", "root", "123456")iterator.foreach(line => {ps = conn.prepareStatement(sql)ps.setString(1, line._1)ps.setInt(2, line._2)ps.setTimestamp(3, new Timestamp(System.currentTimeMillis()))ps.executeUpdate()})} catch {case e: Exception => println("Mysql Exception:" + e.toString)} finally {if (ps != null) {ps.close()}if (conn != null) {conn.close()}}}def ip2Long(ip: String): Long = {val fragments: Array[String] = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length) {ipNum = fragments(i).toLong | ipNum << 8L}ipNum}def binarySearch(lines: Array[(String, String, String)], ip: Long): Int = {var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong)) {return middle}if (ip < lines(middle)._1.toLong) {high = middle - 1} else {low = middle + 1}}-1}def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[5]").setAppName("ip_location")val sc = new SparkContext(conf)// 讀取基站數據val ipRulesArrayRDD: RDD[(String, String, String)] = sc.textFile("D:\\ip.txt").map(line => {val fields = line.split("\\|")val start_num = fields(2)val end_num = fields(3)val province = fields(6)(start_num, end_num, province)})// 全部的ip映射規則val ipRulesArray: Array[(String, String, String)] = ipRulesArrayRDD.collect()// 廣播規則val ipRulesBroadcast: Broadcast[Array[(String, String, String)]] = sc.broadcast(ipRulesArray)// 加載要處理的數據val ipsRDD: RDD[String] = sc.textFile("D:\\20090121000132.394251.http.format").map(line => {val fields = line.split("\\|")fields(1)})// 對數據進行處理,取的結果val result: RDD[(String, Int)] = ipsRDD.map(line => {val ipNum: Long = ip2Long(line)// 注意: 這里傳遞的是廣播的變量值val index: Int = binarySearch(ipRulesBroadcast.value, ipNum)// (ip的起始Num, ip的結束Num, 省份名)val info: (String, String, String) = ipRulesBroadcast.value(index)info}).map(t => (t._3, 1)).reduceByKey(_ + _)// 打印數據result.foreach(println)// 把數據寫入mysqlresult.foreachPartition(data2Mysql(_))sc.stop()} }
      • mysql表邏輯

        • 建表語句

          create table location_info(location varchar(255),counts int unsigned,access_date timestamp );
        • 表結構

    • 運行結果
    • 打jar包部署到集群進行測試
      • standalone模式提交

        • 腳本

          bin/spark-submit --master spark://192.168.1.27:7077,192.168.1.28:7077 --executor-memory 512m --total-executor-cores 2 --class com.xiaofan.ip_location.IPLocation /home/hadoop/fanjh/demo001-1.0.0.jar hdfs://cluster/ip_location/in/ip.txt hdfs://cluster/ip_location/in/20090121000132.394251.http.format hdfs://cluster/ip_location/out_3
        • spark控制臺運行結果顯示



        • hdfs運行結果顯示

      • yarn模式提交

        • client客戶端模式

          bin/spark-submit --master yarn --deploy-mode client --executor-memory 512m --total-executor-cores 1 --class com.xiaofan.ip_location.IPLocation /home/hadoop/fanjh/demo001-1.0.0.jar hdfs://cluster/ip_location/in/ip.txt hdfs://cluster/ip_location/in/20090121000132.394251.http.format hdfs://cluster/ip_location/out_6
        • cluster集群模式

          bin/spark-submit --master yarn --deploy-mode cluster --driver-memory 1g --executor-memory 512m --total-executor-cores 1 --class com.xiaofan.ip_location.IPLocation /home/hadoop/fanjh/demo001-1.0.0.jar hdfs://cluster/ip_location/in/ip.txt hdfs://cluster/ip_location/in/20090121000132.394251.http.format hdfs://cluster/ip_location/out_6
        • yarn的client和cluster模式區別

    • iplocation(ip的熱力圖)注意點
      • 廣播變量: 共享的內存,只讀的,只能追加; Spark Broadcast 原理鏈接
      • foreachPartition: 對每個分區的數據進行操作,可以在分區操作的時候創建外部鏈接(jedis, mysql, hbase)

    3. 三種任務提交流程standalone、yarn-cluster、yarn-client

    3.1. standalone模式

    • 任務提交流程
      • spark-submit 提交任務給 Master
      • Master 收到任務請求后通過 LaunchDriver 向 Worker 請求啟動 Driver
      • Worker 收到請求后啟動 Driver
      • Driver 啟動后向 Master 注冊(用戶App信息)
      • Master 收到 App 信息后根據資源的情況向 Worker 發送 launchExecutor 啟動 Excutor
      • Worker 收到 Master 的請求后啟動相應的 Excutor
      • Excutor 啟動后負責與 Driver 通信, 執行相關任務

    3.2. Spark on Yarn

    • Cluster集群模式

      • 作業提交流程
        • 由client向RM提交請求,并上傳jar到HDFS上(這期間包括四個步驟:)
          • 連接到RM
          • 從 RM ASM(Applications Manager )中獲得metric、queue和resource等信息
          • 上傳 app jar and spark-assembly jar
          • 設置運行環境和container上下文(launch-container.sh等腳本)
        • ASM 向 Scheduler 申請空閑 container
        • Scheduler 向 ASM 返回空閑 container 信息(NM 等)
        • RM(ASM)根據返回信息向 NM 申請資源。
        • NM 分配創建一個container 并創建Spark Application Master(AM),此時 AM 上運行的是 Spark Driver。(每個SparkContext都有一個 AM)
        • AM啟動后,和RM(ASM)通訊,請求根據任務信息向RM(ASM)申請 container 來啟動 executor
        • RM(ASM)將申請到的資源信息返回給AM
        • AM 根據返回的資源信息區請求對應的 NM 分配 container 來啟動 executor
        • NM 收到請求會啟動相應的 container 并啟動 executor
        • executor 啟動成后 反向向 AM 注冊
        • executor 和 AM 交互 完成任務
        • 后續的DAGScheduler、TaskScheduler、Shuffle等操作都是和standalone一樣
        • 等到所有的任務執行完畢后,AM 向 ASM 取消注冊并釋放資源
    • Client客戶端模式

      • 在yarn-client模式下,Driver運行在Client上,通過ApplicationMaster向RM獲取資源。本地Driver負責與所有的executor container進行交互,并將最后的結果匯總。整體的過程與yarn-cluster類似。
      • 不同點在于 Driver 是運行在本地客戶端,它的 AM 只是作為一個 Executor 啟動器,并沒有 Driver 進程。而且 Executor啟動后是與 Client 端的 Driver 進行交互的,所以 Client 如果掛了 任務也就掛了。
      • 在yarn-client、yarn-cluster 提交模式中,可以不啟動Spark集群,應為相關的jvm環境有yarn管理(啟動、結束等)。standalone 提交模式中 Spark 集群一定要啟動,因為需要依賴worker、Master進行任務的啟動、調度等。

    4. 海闊憑魚躍,天高任鳥飛

    總結

    以上是生活随笔為你收集整理的8. spark学习之旅(二)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。