日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

發布時間:2024/1/1 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

前言

一、PySpark基礎功能

?1.Spark SQL 和DataFrame

2.Pandas API on Spark

3.Streaming

4.MLBase/MLlib

5.Spark Core

二、PySpark依賴

Dependencies

三、DataFrame

1.創建

創建不輸入schema格式的DataFrame

創建帶有schema的DataFrame

從Pandas DataFrame創建

通過由元組列表組成的RDD創建

2.查看

DataFrame.show()

spark.sql.repl.eagerEval.enabled

縱向顯示

?查看DataFrame格式和列名

查看統計描述信息

PySpark DataFrame轉換為Pandas DataFrame

?3.查詢

添加新列實例:

條件查詢DataFrame.filter()

?4.運算

Pandas_udf

DataFrame.mapInPandas

5.分組

?聯合分組和應用函數

?6.獲取數據輸入/輸出

CSV

?Parquet

?ORC

?四、結合Spark SQL

點關注,防走丟,如有紕漏之處,請留言指教,非常感謝


前言

要想了解PySpark能夠干什么可以去看看我之前寫的文章,里面很詳細介紹了Spark的生態:

Spark框架深度理解一:開發緣由及優缺點

Spark框架深度理解二:生態圈

Spark框架深度理解三:運行架構、核心數據集RDD

PySpark只是通過JVM轉換使得Python代碼能夠在Spark集群上識別運行。故Spark的絕大多數功能都可以被Python程序使用。

上篇文章:一文速學-PySpark數據分析基礎:PySpark原理詳解

已經把PySpark運行原理講的很清楚了,現在我們需要了解PySpark語法基礎來逐漸編寫PySpark程序實現分布式數據計算。

已搭建環境:

Spark:3.3.0

Hadoop:3.3.3

Scala:2.11.12

JDK:1.8.0_201

PySpark:3.1.2


一、PySpark基礎功能

PySpark是Python中Apache Spark的接口。它不僅可以使用Python API編寫Spark應用程序,還提供了PySpark shell,用于在分布式環境中交互分析數據。PySpark支持Spark的大多數功能,如Spark SQL、DataFrame、Streaming、MLlib(機器學習)和Spark Core。

?1.Spark SQL 和DataFrame

Spark SQL是用于結構化數據處理的Spark模塊。它提供了一種稱為DataFrame的編程抽象,是由SchemaRDD發展而來。不同于SchemaRDD直接繼承RDD,DataFrame自己實現了RDD的絕大多數功能。可以把Spark SQL DataFrame理解為一個分布式的Row對象的數據集合。

Spark SQL已經集成在spark-shell中,因此只要啟動spark-shell就可以使用Spark SQL的Shell交互接口。如果在spark-shell中執行SQL語句,需要使用SQLContext對象來調用sql()方法。Spark SQL對數據的查詢分成了兩個分支:SQLContext和HiveContext,其中HiveContext繼承了SQLContext,因此HiveContext除了擁有SQLContext的特性之外還擁有自身的特性。

Spark SQL允許開發人員直接處理RDD,同時也可查詢例如在 Apache Hive上存在的外部數據。Spark SQL的一個重要特點是其能夠統一處理關系表和RDD,使得開發人員可以輕松地使用SQL命令進行外部查詢,同時進行更復雜的數據分析。

2.Pandas API on Spark

Spark上的pandas API可以擴展使用 python pandas庫。

  • 輕松切換到pandas API和PySpark API上下文,無需任何開銷。
  • 有一個既適用于pandas(測試,較小的數據集)又適用于Spark(分布式數據集)的代碼庫。
  • 熟練使用pandas的話很快上手

3.Streaming

Apache Spark中的Streaming功能運行在Spark之上,支持跨Streaming和歷史數據的強大交互和分析應用程序,同時繼承了Spark的易用性和容錯特性。Spark Streaming是將流式計算分解成一系列短小的批處理作業。這里的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分成一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),然后將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果保存在內存中。

4.MLBase/MLlib

MLlib構建在Spark之上,是一個可擴展的機器學習庫,它提供了一組統一的高級API,幫助用戶創建和調整實用的機器學習管道。MLBase分為四部分:MLlib、MLI、ML Optimizer和MLRuntime。

  • ML Optimizer會選擇它認為最適合的已經在內部實現好了的機器學習算法和相關參數,來處理用戶輸入的數據,并返回模型或別的幫助分析的結果;
  • MLI 是一個進行特征抽取和高級ML編程抽象的算法實現的API或平臺;
  • MLlib是Spark實現一些常見的機器學習算法和實用程序,包括分類、回歸、聚類、協同過濾、降維以及底層優化,該算法可以進行可擴充; MLRuntime 基于Spark計算框架,將Spark的分布式計算應用到機器學習領域。
    ?

5.Spark Core

Spark Core是Spark平臺的底層通用執行引擎,所有其他功能都構建在其之上。它提供了RDD(彈性分布式數據集)和內存計算能力。

二、PySpark依賴

Dependencies

Package最低版本限制Note
pandas1.0.5支撐Spark SQL
Numpy1.7滿足支撐MLlib基礎API
pyarrow1.0.0支撐Spark SQL
Py4j0.10.9.5要求
pandas1.0.5pandas API on Spark需要
pyarrow1.0.0pandas API on Spark需要
Numpy1.14pandas API on Spark需要

請注意,PySpark需要Java 8或更高版本,并正確設置Java_HOME。如果使用JDK 11,請設置Dio.netty.tryReflectionSetAccessible=true?以獲取與箭頭相關的功能。

AArch64(ARM64)用戶注意:PyArrow是PySpark SQL所必需的,但PyArrow 4.0.0中引入了對AArch64的PyArrow支持。如果由于PyArrow安裝錯誤導致PyArrow安裝在AArch64上失敗,可以按如下方式安裝PyArrow>=4.0.0:

pip install "pyarrow>=4.0.0" --prefer-binary

三、DataFrame

PySpark應用程序從初始化SparkSession開始,SparkSession是PySpark的入口點,如下所示。如果通過PySpark可執行文件在PySpark shell中運行它,shell會自動在變量spark中為用戶創建會話。

from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()

1.創建

PySpark DataFrame能夠通過pyspark.sql.SparkSession.createDataFrame創建,通常通過傳遞列表(list)、元組(tuples)和字典(dictionaries)的列表和pyspark.sql.Rows,Pandas DataFrame,由此類列表組成的RDD轉換。pyspark.sql.SparkSession.createDataFrame接收schema參數指定DataFrame的架構(優化可加速)。省略時,PySpark通過從數據中提取樣本來推斷相應的模式。

創建不輸入schema格式的DataFrame

from datetime import datetime, date import pandas as pd from pyspark.sql import Rowdf = spark.createDataFrame([Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0)) ]) df DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

創建帶有schema的DataFrame

df = spark.createDataFrame([(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0)) ], schema='a long, b double, c string, d date, e timestamp') df DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

從Pandas DataFrame創建

pandas_df = pd.DataFrame({'a': [1, 2, 3],'b': [2., 3., 4.],'c': ['string1', 'string2', 'string3'],'d': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)] }) df = spark.createDataFrame(pandas_df) df DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

通過由元組列表組成的RDD創建

rdd = spark.sparkContext.parallelize([(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0)) ]) df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e']) df DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

?以上的DataFrame格式創建的都是一樣的。

df.printSchema() root|-- a: long (nullable = true)|-- b: double (nullable = true)|-- c: string (nullable = true)|-- d: date (nullable = true)|-- e: timestamp (nullable = true)

2.查看

DataFrame.show()

使用格式:

df.show(<int>) df.show(1) +---+---+-------+----------+-------------------+ | a| b| c| d| e| +---+---+-------+----------+-------------------+ | 1|2.0|string1|2000-01-01|2000-01-01 12:00:00| +---+---+-------+----------+-------------------+ only showing top 1 row

spark.sql.repl.eagerEval.enabled

spark.sql.repl.eagerEval.enabled用于在notebooks(如Jupyter)中快速生成PySpark DataFrame的配置。控制行數可以使用spark.sql.repl.eagerEval.maxNumRows。

spark.conf.set('spark.sql.repl.eagerEval.enabled', True) df

?

spark.conf.set('spark.sql.repl.eagerEval.maxNumRows',1) df

?

縱向顯示

行也可以垂直顯示。當行太長而無法水平顯示時,縱向顯示就很明顯。

df.show(1, vertical=True) -RECORD 0------------------a | 1b | 2.0c | string1d | 2000-01-01e | 2000-01-01 12:00:00 only showing top 1 row

?查看DataFrame格式和列名

df.columns ['a', 'b', 'c', 'd', 'e'] df.printSchema() root|-- a: long (nullable = true)|-- b: double (nullable = true)|-- c: string (nullable = true)|-- d: date (nullable = true)|-- e: timestamp (nullable = true)

查看統計描述信息

df.select("a", "b", "c").describe().show() +-------+---+---+-------+ |summary| a| b| c| +-------+---+---+-------+ | count| 3| 3| 3| | mean|2.0|3.0| null| | stddev|1.0|1.0| null| | min| 1|2.0|string1| | max| 3|4.0|string3| +-------+---+---+-------+

DataFrame.collect()將分布式數據收集到驅動程序端,作為Python中的本地數據。請注意,當數據集太大而無法容納在驅動端時,這可能會引發內存不足錯誤,因為它將所有數據從執行器收集到驅動端。

df.collect() [Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

?為了避免引發內存不足異常可以使用DataFrame.take()或者是DataFrame.tail():

df.take(1) [Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))] df.tail(1) [Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

PySpark DataFrame轉換為Pandas DataFrame

?PySpark DataFrame還提供了到pandas DataFrame的轉換,以利用pandas API。注意,toPandas還將所有數據收集到driver端,當數據太大而無法放入driver端時,很容易導致內存不足錯誤。

df.toPandas()

?

?3.查詢

PySpark DataFrame是惰性計算的,僅選擇一列不會觸發計算,但它會返回一個列實例:

df.a Column<'a'>

大多數按列操作都返回列:

from pyspark.sql import Column from pyspark.sql.functions import uppertype(df.c) == type(upper(df.c)) == type(df.c.isNull()) True

上述生成的Column可用于從DataFrame中選擇列。例如,DataFrame.select()獲取返回另一個DataFrame的列實例:

df.select(df.c).show() +-------+ | c| +-------+ |string1| |string2| |string3| +-------+

添加新列實例:

df.withColumn('upper_c', upper(df.c)).show() +---+---+-------+----------+-------------------+-------+ | a| b| c| d| e|upper_c| +---+---+-------+----------+-------------------+-------+ | 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1| | 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2| | 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3| +---+---+-------+----------+-------------------+-------+

條件查詢DataFrame.filter()

df.filter(df.a == 1).show() +---+---+-------+----------+-------------------+ | a| b| c| d| e| +---+---+-------+----------+-------------------+ | 1|2.0|string1|2000-01-01|2000-01-01 12:00:00| +---+---+-------+----------+-------------------+

?4.運算

Pandas_udf

PySpark支持各種UDF和API,允許用戶執行Python本機函數。另請參閱最新的Pandas UDF(?Pandas UDFs)和Pandas Function API(?Pandas Function APIs)。例如,下面的示例允許用戶在Python本機函數中直接使用pandas Series中的API。

Apache Arrow in PySpark

import pandas as pd from pyspark.sql.functions import pandas_udf@pandas_udf('long') def pandas_plus_one(series: pd.Series) -> pd.Series:# Simply plus one by using pandas Series.return series + 1df.select(pandas_plus_one(df.a)).show() +------------------+ |pandas_plus_one(a)| +------------------+ | 2| | 3| | 4| +------------------+

DataFrame.mapInPandas

DataFrame.mapInPandas允許用戶在pandas DataFrame中直接使用API,而不受結果長度等任何限制。

def pandas_filter_func(iterator):for pandas_df in iterator:yield pandas_df[pandas_df.a == 1]df.mapInPandas(pandas_filter_func, schema=df.schema).show() +---+---+-------+----------+-------------------+ | a| b| c| d| e| +---+---+-------+----------+-------------------+ | 1|2.0|string1|2000-01-01|2000-01-01 12:00:00| +---+---+-------+----------+-------------------+

5.分組

PySpark DataFrame還提供了一種使用常見方法,即拆分-應用-合并策略來處理分組數據的方法。它根據特定條件對數據進行分組,對每個組應用一個函數,然后將它們組合回DataFrame。

df = spark.createDataFrame([['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2']) df.show() +-----+------+---+---+ |color| fruit| v1| v2| +-----+------+---+---+ | red|banana| 1| 10| | blue|banana| 2| 20| | red|carrot| 3| 30| | blue| grape| 4| 40| | red|carrot| 5| 50| |black|carrot| 6| 60| | red|banana| 7| 70| | red| grape| 8| 80| +-----+------+---+---+

?分組,然后將avg()函數應用于結果組。

df.groupby('color').avg().show() +-----+-------+-------+ |color|avg(v1)|avg(v2)| +-----+-------+-------+ | red| 4.8| 48.0| | blue| 3.0| 30.0| |black| 6.0| 60.0| +-----+-------+-------+

還可以使用pandas API對每個組應用Python自定義函數。

def plus_mean(pandas_df):return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show() +-----+------+---+---+ |color| fruit| v1| v2| +-----+------+---+---+ |black|carrot| 0| 60| | blue|banana| -1| 20| | blue| grape| 1| 40| | red|banana| -3| 10| | red|carrot| -1| 30| | red|carrot| 0| 50| | red|banana| 2| 70| | red| grape| 3| 80| +-----+------+---+---+

?聯合分組和應用函數

df1 = spark.createDataFrame([(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],('time', 'id', 'v1'))df2 = spark.createDataFrame([(20000101, 1, 'x'), (20000101, 2, 'y')],('time', 'id', 'v2'))def asof_join(l, r):return pd.merge_asof(l, r, on='time', by='id')df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(asof_join, schema='time int, id int, v1 double, v2 string').show() +--------+---+---+---+ | time| id| v1| v2| +--------+---+---+---+ |20000101| 1|1.0| x| |20000102| 1|3.0| x| |20000101| 2|2.0| y| |20000102| 2|4.0| y| +--------+---+---+---+

?6.獲取數據輸入/輸出

CSV簡單易用。Parquet和ORC是高效緊湊的文件格式,讀寫速度更快。

PySpark中還有許多其他可用的數據源,如JDBC、text、binaryFile、Avro等。另請參閱Apache Spark文檔中最新的Spark SQL、DataFrames和Datasets指南。Spark SQL, DataFrames and Datasets Guide

CSV

df.write.csv('foo.csv', header=True) spark.read.csv('foo.csv', header=True).show()

這里記錄一個報錯:

java.lang.UnsatisfiedLinkError:org.apache.hadoop.io.nativeio.NativeIO$Windows.access0

?將Hadoop安裝目錄下的 bin 文件夾中的 hadoop.dll 和 winutils.exe 這兩個文件拷貝到 C:\Windows\System32 下,問題解決。

+---+---+-------+----------+--------------------+ | a| b| c| d| e| +---+---+-------+----------+--------------------+ | 1|2.0|string1|2000-01-01|2000-01-01T12:00:...| | 2|3.0|string2|2000-02-01|2000-01-02T12:00:...| | 3|4.0|string3|2000-03-01|2000-01-03T12:00:...| +---+---+-------+----------+--------------------+

?Parquet

df.write.parquet('bar.parquet') spark.read.parquet('bar.parquet').show() +-----+------+---+---+ |color| fruit| v1| v2| +-----+------+---+---+ |black|carrot| 6| 60| | blue|banana| 2| 20| | blue| grape| 4| 40| | red|carrot| 5| 50| | red|banana| 7| 70| | red|banana| 1| 10| | red|carrot| 3| 30| | red| grape| 8| 80| +-----+------+---+---+

?ORC

df.write.orc('zoo.orc') spark.read.orc('zoo.orc').show()

+-----+------+---+---+ |color| fruit| v1| v2| +-----+------+---+---+ | red|banana| 7| 70| | red| grape| 8| 80| |black|carrot| 6| 60| | blue|banana| 2| 20| | red|banana| 1| 10| | red|carrot| 5| 50| | red|carrot| 3| 30| | blue| grape| 4| 40| +-----+------+---+---+

?四、結合Spark SQL

DataFrame和Spark SQL共享同一個執行引擎,因此可以無縫地互換使用。例如,可以將數據幀注冊為表,并按如下方式輕松運行SQL:

df.createOrReplaceTempView("tableA") spark.sql("SELECT count(*) from tableA").show() +--------+ |count(1)| +--------+ | 8| +--------+

?此外UDF也可在現成的SQL中注冊和調用

@pandas_udf("integer") def add_one(s: pd.Series) -> pd.Series:return s + 1spark.udf.register("add_one", add_one) spark.sql("SELECT add_one(v1) FROM tableA").show()

?

這些SQL表達式可以直接混合并用作PySpark列。

from pyspark.sql.functions import exprdf.selectExpr('add_one(v1)').show() df.select(expr('count(*)') > 0).show()


點關注,防走丟,如有紕漏之處,請留言指教,非常感謝

以上就是本期全部內容。我是fanstuck ,有問題大家隨時留言討論 ,我們下期見。

總結

以上是生活随笔為你收集整理的PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。