日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

PySpark

發布時間:2024/1/1 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 PySpark 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、概念

  • 每個spark?應用都由一個驅動器程序(driver program)來發起集群上的各種并行操作

    • driver program 包含了應用的main 函數,并且定義了集群上的分布式數據集,還對這些分布式數據集應用了相關操作
    • driver program 通過一個SparkContext 對象來訪問spark
    • driver program 一般要管理多個執行器(executor) 節點
  • SparkContext:該對象代表了對計算集群的一個連接

    • 在pyspark shell 中,當shell 啟動時,已經自動創建了一個SparkContext 對象,它叫做sc。
    • 通常可以用它來創建RDD
  • 二、PySpark shell與獨立應用

    1.PySpark shell

    spark?帶有交互式的?shell,可以用于即時數據分析

    ? ? ? ?(1)spark shell 可以與分布式存儲在許多機器的內存或者硬盤上的數據進行交互,處理過程由spark 自動控制

    ? (2)pyspark shell 是 spark shell 的python 版本

    ? ? ? ? 使用pyspark shell:進入spark?的安裝目錄,然后執行bin/pyspark?。若已添加環境變量,可直接輸入:pyspark即可啟動pyspark shell。

    2.獨立應用

    獨立應用與pyspark shell?的主要區別在于:你需要自行初始化SparkContext,除此之外二者使用的API?完全相同。

    在python?中,你需要把獨立應用寫成python?腳本,然后使用Spark?自帶的bin/spark-submit?腳本來運行:

    bin/spark-submit my_script.py

    spark-submit會引入python程序的spark依賴

  • 在獨立應用中,通常使用下面方法初始化SparkContext:

    from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster('local').setAppName('My App') sc = SparkContext(conf = conf)

    首先創建一個SparkConf?對象來配置應用,然后基于該SparkConf?來創建一個SparkContext?對象。

    (1)setMaster() 給出了集群的URL,告訴spark 如何連接到集群上。這里'local' 表示讓spark 運行在單機單線程上。(2)setAppName() 給出了應用的名字。當連接到一個集群上時,這個值可以幫助你在集群管理器的用戶界面上找到你的應用。
  • 關閉spark?可以調用SparkContext?的?.stop()?方法,或者直接退出應用(如調用System.exit(0)?或者sys.exit())
  • 如果需要使用python3,則使用export PYSPARK_PYTHON=python3?來導出環境變量。

    (1)或者在代碼中使用os.environ["PYSPARK_PYTHON"]="python3"
  • 三、RDD

    1.例子:

    ? ? (1)單機模式

    import pyspark sc = pyspark.SparkContext('local[*]')txt = sc.textFile('file:usr/share/doc/python/copyright') print(txt.count())python_lines = txt.filter(lambda line: 'python' in line.lower()) print(python_lines.count())

    ? ? ? 任何PySpark程序的入口都是SparkContext對象。 該對象允許連接到Spark集群并創建RDD。 local [*]字符串是一個特殊字符串,表示正在使用本地集群,這是正在單機模式下運行的另一種表達方式。 *告訴Spark在計算機上創建與邏輯核心(logical cores)一樣多的工作線程。

    ? ? (2)分布式

    ? ? ? 使用群集時,創建SparkContext可能會涉及更多。 要連接到Spark集群,可能需要處理身份驗證以及集群特定的其他一些信息。 可以參照以下內容設置這些詳細信息:

    conf = pyspark.SparkConf() conf.setMaster('spark://head_node:56887') conf.set('spark.authenticate', True) conf.set('spark.authenticate.secret', 'secret-key') sc = SparkContext(conf=conf)

    ? ? ? ? 四、創建RDD方法

    ? ? ? ? 一旦擁有SparkContext對象,就可以開始創建RDD。

    ? ? ? ? (1)可以通過多種方式創建RDD,但是一種常見的方式是PySpark parallelize()函數。 parallelize()可以將某些Python數據結構(如列表和元組)轉換為RDD,從而使它們具有容錯性和分布式功能。

    ? ? ? ? 為了更好地理解RDD,請考慮另一個示例。 以下代碼創建一個由10,000個元素組成的迭代器,然后使用parallelize()將數據分布到2個分區中:

    >>> big_list = range(10000) >>> rdd = sc.parallelize(big_list, 2) >>> odds = rdd.filter(lambda x: x % 2 != 0) >>> odds.take(5) [1, 3, 5, 7, 9]

    ? ? ? ? parallelize()將迭代器轉換為一組分布式數字,并提供了Spark基礎架構的所有功能。

    ? ? ? ? ?請注意,此代碼使用RDD的filter()方法,而不是Python的內置filter()。 結果是一樣的,但是幕后發生的事情卻大不相同。 通過使用RDD filter()方法,該操作在多個CPU或計算機之間以分布式方式進行。

    ? ? ? ? 再次,將其想象為Spark完成multiprocessing?工作,所有這些工作都封裝在RDD數據結構中。

    ? ? ? ? take()是查看RDD內容的一種方法,但只能查看一小部分。 take()將數據子集從分布式系統中拉到單臺計算機上。

    ? ? ? ? take()對于調試很重要,因為可能無法在一臺計算機上檢查整個數據集。 RDD已針對在大數據上使用進行了優化,因此在現實世界中,一臺計算機可能沒有足夠的RAM來容納整個數據集。

    ? ? ? ?(2) 創建RDD的另一種方法是使用textFile()讀取文件,在前面的示例中已經看到過。 RDD是使用PySpark的基礎數據結構之一,因此API中的許多函數都返回RDD。

    ? ? ? ? RDD與其他數據結構之間的主要區別之一是處理被延遲直到結果被請求為止。這類似于Python生成器。 Python生態系統中通常使用懶惰評估(lazy evaluation)一詞來解釋這種行為。

    ? ? ? ? ?可以在同一RDD上堆疊多個轉換,而無需進行任何處理。之所以可以使用此功能,是因為Spark會維護轉換的有向無環圖。僅在最終結果被請求時才激活基礎圖。在前面的示例中,直到通過調用take()請求結果之后,才進行計算。

    ? ? ? ? 有多種方法可以向RDD請求結果。可以使用RDD上的collect()顯式請求評估結果并將其收集到單個群集節點。還可以通過各種方式隱式請求結果,如先前所見,其中之一就是使用count()。

    參考:

    1.https://realpython.com/pyspark-intro/

    2.http://www.huaxiaozhuan.com/%E5%B7%A5%E5%85%B7/spark/chapters/01_basic.html

    總結

    以上是生活随笔為你收集整理的PySpark的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。