日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

案例 | 用pdpipe搭建pandas数据分析流水线

發布時間:2025/3/8 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 案例 | 用pdpipe搭建pandas数据分析流水线 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

作者:費弗里

博客地址:

https://www.cnblogs.com/feffery/p/12179647.html

說明:本文經作者授權轉載,禁止二次轉載

本文對應腳本及數據已上傳至我的Github倉庫https://github.com/CNFeffery/DataScienceStudyNotes

1 簡介

在數據分析任務中,從原始數據讀入,到最后分析結果出爐,中間絕大部分時間都是在對數據進行一步又一步的加工規整,以流水線(pipeline)的方式完成此過程更有利于梳理分析脈絡,也更有利于查錯改正。pdpipe作為專門針對pandas進行流水線化改造的模塊,為熟悉pandas的數據分析人員書寫優雅易讀的代碼提供一種簡潔的思路,本文就將針對pdpipe的用法進行介紹。

2 pdpipe常用功能介紹

pdpipe的出現極大地對數據分析過程進行規范,其主要擁有以下特性:

  • 簡潔的語法邏輯

  • 在流水線工作過程中可輸出規整的提示或錯誤警報信息

  • 輕松串聯不同數據操作以組成一條完整流水線

  • 輕松處理多種類型數據

  • 純Python編寫,便于二次開發

通過pip install pdpipe安裝完成,接下來我們將在jupyter lab中以TMDB 5000 Movie Dataset中的tmdb_5000_movies.csv數據集(圖1)為例來介紹pdpipe的主要功能。
這是Kaggle上的公開數據集,記錄了一些電影的相關屬性信息,你也可以在數據科學學習手札系列文章的Github倉庫對應本篇文章的路徑下直接獲取該數據集。

圖1 TMDB 5000 Movie Dataset數據集

2.1 從一個簡單的例子開始

首先在jupyter lab中讀入tmdb_5000_movies.csv數據集并查看其前3行(圖2):

import pandas as pd import pdpipe# 讀入tmdb_5000_movies.csv數據集并查看前3行 data = pd.read_csv('tmdb_5000_movies.csv');data.head(3) 圖2

可以看出,數據集包含了數值、日期、文本以及json等多種類型的數據,現在假設我們需要基于此數據完成以下流程:

1、刪除original_title
2、對title列進行小寫化處理
3、丟掉vote_average小于等于7,且original_language不為en的行
4、求得genres對應電影類型的數量保存為新列genres_num,并刪除原有的genres
5、丟掉genres_num小于等于5的行

上述操作直接使用pandas并不會花多少時間,但是想要不創造任何中間臨時結果一步到位產生所需的數據框子集,并且保持代碼的可讀性不是一件太容易的事,但是利用pdpipe,我們可以非常優雅地實現上述過程:

# 以pdp.PdPipeline傳入流程列表的方式創建pipeline first_pipeline = pdp.PdPipeline([pdp.ColDrop("original_title"),pdp.ApplyByCols(columns=['title'], func=lambda x: x.lower()),pdp.RowDrop({'vote_average': lambda x: x <= 7, 'original_language': lambda x: x != 'en'}),pdp.ApplyByCols(columns=['genres'], func=lambda x: [item['name'] for item in eval(x)].__len__(), result_columns=['genres_num']),pdp.RowDrop({'genres_num': lambda x: x <= 5})])# 將創建的pipeline直接作用于data直接得到所需結果,并打印流程信息 first_pipeline(data, verbose=True).reset_index(drop=True)

得到的結果如圖3所示:

圖3

我們不僅保證了代碼優雅簡潔,可讀性強,結果的一步到位,還自動打印出整個流水線運作過程的狀態說明!
令人興奮的是pdpipe充分封裝了pandas的核心功能尤其是apply相關操作,使得常規或非常規的數據分析任務都可以利用pdpipe中的API結合自定義函數來優雅地完成,小小領略到pdpipe的妙處之后,下文我們來展開詳細介紹。

2.2 pdpipe中的重要子模塊

pdpipe中的API按照不同分工被劃分到若干子模塊,下面將針對常用的幾類API展開介紹。

2.2.1 basic_stages

basic_stages中包含了對數據框中的行、列進行丟棄/保留、重命名以及重編碼的若干類:

ColDrop:
  這個類用于對指定單個或多個列進行丟棄,其主要參數如下:

  • columns:字符串或列表,用于指定需要丟棄的列名

  • errors:字符串,傳入 'ignore' 或 'raise',用于指定丟棄指定列時遇到錯誤采取的應對策略,'ignore'表示忽略異常,'raise'表示拋出錯誤打斷流水線運作,默認為'raise'

下面是舉例演示(注意單個流水線部件可以直接傳入源數據執行apply方法直接得到結果),我們分別對單列和多列進行刪除操作:

  • 單列刪除

# 刪除budget列 pdp.ColDrop(columns='budget').apply(data).head(3)

刪除后得到的結果如圖4:

圖4
  • 多列刪除

# 刪除budget之外的所有列del_col = data.columns.tolist() del_col.remove('budget') pdp.ColDrop(columns=del_col).apply(data).head(3)

得到的結果中只有budget列被保留,如圖5:

圖5

ColRename:
  這個類用于對指定列名進行重命名,其主要參數如下:

  • rename_map:字典,傳入舊列名->新列名鍵值對

下面是舉例演示:

  • 列重命名

# 將budget重命名為Budget pdp.ColRename(rename_map={'budget': 'Budget'}).apply(data).head(3)

結果如圖6:

圖6

ColReorder:
  這個類用于修改列的順序,其主要參數如下:

  • positions:字典,傳入列名->新的列下標鍵值對

下面是舉例演示:

  • 修改列位置

# 將budget從第0列挪動為第3列 pdp.ColReorder(positions={'budget': 3}).apply(data).head(3)

結果如圖7:

圖7

DropNa:
  這個類用于丟棄數據中空值元素,其主要參數與pandas中的dropna()保持一致,核心參數如下:

  • axis:0或1,0表示刪除含有缺失值的行,1表示刪除含有缺失值的列

下面是舉例演示,首先我們創造一個包含缺失值的數據框:

import numpy as np # 創造含有缺失值的示例數據 df = pd.DataFrame({'a': [1, 4, 1, 5],'b': [4, None, np.nan, 7]}) df 圖8
  • 刪除缺失值所在行

# 刪除含有缺失值的行 pdp.DropNa(axis=0).apply(df)

結果如圖9:

圖9
  • 刪除缺失值所在列

# 刪除含有缺失值的列 pdp.DropNa(axis=1).apply(df)

結果如圖10:

圖10

FreqDrop:
  這個類用于刪除在指定的一列數據中出現頻次小于所給閾值對應的全部行,主要參數如下:

  • threshold:int型,傳入頻次閾值,低于這個閾值的行將會被刪除

  • column:str型,傳入threshold參數具體作用的列

下面是舉例演示,首先我們來查看電影數據集中original_language列對應的頻次分布情況:

# 查看original_language頻次分布 pd.value_counts(data['original_language']) 圖11

下面我們來過濾刪除original_language列出現頻次小于10的行:

# 過濾original_language頻次低于10的行,再次查看過濾后的數據original_language頻次分布 pd.value_counts(pdp.FreqDrop(threshold=10, column='original_language').apply(data)['original_language']) 圖12

RowDrop:
  這個類用于刪除滿足指定限制條件的行,主要參數如下:

  • conditions:dict型,傳入指定列->該列刪除條件鍵值對

  • reduce:str型,用于決定多列組合條件下的刪除策略,'any'相當于條件或,即滿足至少一個條件即可刪除;'all'相當于條件且,即滿足全部條件才可刪除;'xor'相當于條件異或,即當恰恰滿足一個條件時才會刪除,滿足多個或0個都不進行刪除。默認為'any'

下面是舉例演示,我們以budget小于100000000genres不包含Actionrelease_date缺失以及vote_count小于1000作為組合刪除條件,分別查看在三種不同刪除策略下的最終得以保留的數據行數:

  • 刪除策略:any

pdp.RowDrop(conditions={'budget': lambda x: x <= 100000000,'genres': lambda x: 'Action'notin x,'release_date': lambda x: x == np.nan,'vote_count': lambda x: x <= 1000},reduce='any').apply(data).shape[0]
  • 刪除策略:all

pdp.RowDrop(conditions={'budget': lambda x: x <= 100000000,'genres': lambda x: 'Action'notin x,'release_date': lambda x: x == np.nan,'vote_count': lambda x: x <= 1000},reduce='all').apply(data).shape[0]
  • 刪除策略:xor

pdp.RowDrop(conditions={'budget': lambda x: x <= 100000000,'genres': lambda x: 'Action'notin x,'release_date': lambda x: x == np.nan,'vote_count': lambda x: x <= 1000},reduce='xor').apply(data).shape[0]

對應的結果如下:

圖13

2.2.2 col_generation

col_generation中包含了從原數據中產生新列的若干功能:

AggByCols:
  這個類用于將指定的函數作用到指定的列上以產生新結果(可以是新的列也可以是一個聚合值),即這時函數真正傳入的最小計算對象是列,主要參數如下:

  • columns:str或list,用于指定對哪些列進行計算

  • func:傳入需要計算的函數

  • drop:bool型,決定是否在計算完成后把舊列刪除,默認為True,即對應列的計算結果直接替換掉對應的舊列

  • suffix:str型,控制新列后綴名,當drop參數設置為False時,結果列的列名變為其對應列+suffix參數指定的后綴名;當drop設置為False時,此參數將不起作用(因為新列直接繼承了對應舊列的名稱)

  • result_columns:str或list,與columns參數一一對應的結果列名稱,當你想要自定義結果新列名稱時這個參數就變得非常有用,默認為None

  • func_desc:str型,可選參數,為你的函數添加說明文字,默認為None

下面我們來舉例演示幫助理解上述各個參數:

  • 針對單個列進行計算

pdp.AggByCols(columns='budget',func=np.log).apply(data).head(3)

對應的結果如圖14,可以看到在只傳入columns和func這兩個參數,其他參數均為默認值時,對budget列做對數化處理后的新列直接覆蓋了原有的budget列:

圖14

設置drop參數為False,并將suffix參數設置為'_log':

# 設置drop參數為False,并將suffix參數設置為'_log' pdp.AggByCols(columns='budget',func=np.log,drop=False,suffix='_log').apply(data).head(3) 圖15

可以看到這時原有列得以保留,新的列以舊列名+后綴名的方式被添加到舊列之后,下面我們修改result_columns參數以自定義結果列名:

# 設置drop參數為False,并將suffix參數設置為'_log' pdp.AggByCols(columns='budget',func=np.log,result_columns='budget(log)').apply(data).head(3)圖16
  • 針對多個列進行計算

pdp.AggByCols(columns=['budget', 'revenue'],func=np.log,drop=False,suffix='_log').apply(data).head(3) 圖17
  • 計算列的聚合值

pdp.AggByCols(columns='budget',func=np.mean, # 這里傳入的函數是聚合類型的drop=False,suffix='_mean').apply(data).loc[:, ['budget', 'budget_mean']]

這時為了保持整個數據框形狀的完整,計算得到的聚合值填充到新列的每一個位置上:

圖18

ApplyByCols:
  這個類用于實現pandas中對列的apply操作,不同于AggByCols中函數直接處理的是列,ApplyByCols中函數直接處理的是對應列中的每個元素。主要參數如下:

  • columns:str或list,用于指定對哪些列進行apply操作

  • func:傳入需要計算的函數

  • drop:bool型,決定是否在計算完成后把舊列刪除,默認為True,即對應列的計算結果直接替換掉對應的舊列

  • colbl_sfx:str型,控制新列后綴名,當drop參數設置為False時,結果列的列名變為其對應列+suffix參數指定的后綴名;當drop設置為False時,此參數將不起作用(因為新列直接繼承了對應舊列的名稱)

  • result_columns:str或list,與columns參數一一對應的結果列名稱,當你想要自定義結果新列名稱時這個參數就變得非常有用,默認為None

  • func_desc:str型,可選參數,為你的函數添加說明文字,默認為None

下面我們來舉例演示幫助理解上述各個參數:

  • 求spoken_languages涉及語言數量   下面的示例對每部電影中涉及的語言語種數量進行計算:

pdp.ApplyByCols(columns='spoken_languages',func=lambda x: [item['name'] for item in eval(x)].__len__(),drop=False,result_columns='spoken_languages_num').apply(data)[['spoken_languages', 'spoken_languages_num']]

對應的結果:

圖19

ApplyToRows:
  這個類用于實現pandas中對行的apply操作,傳入的計算函數直接處理每一行,主要參數如下:

  • func:傳入需要計算的函數,對每一行進行處理

  • colname:str型,用于定義結果列的名稱(因為ApplyToRows作用的對象是一整行,因此只能形成一列返回值),默認為'new_col'

  • follow_column:str型,控制結果列插入到指定列名之后,默認為None,即放到最后一列

  • func_desc:str型,可選參數,為你的函數添加說明文字,默認為None

下面我們來舉例演示幫助理解上述各個參數:

  • 得到對應電影的盈利簡報

pdp.ApplyToRows(func=lambda row: f"{row['original_title']}: {round(((row['revenue'] / row['budget'] -1)*100), 2)}%"if row['budget'] != 0elsef"{row['original_title']}: 因成本為0故不進行計算",colname='movie_desc',follow_column='budget',func_desc='輸出對應電影的盈利百分比').apply(data).head(3)

對應的結果:

圖20

Bin:
  這個類用于對連續型數據進行分箱,主要參數如下:

  • bin_map:字典型,傳入列名->分界點列表

  • drop:bool型,決定是否在計算完成后把舊列刪除,默認為True,即對應列的計算結果直接替換掉對應的舊列

下面我們以計算電影盈利率小于0,大于0小于100%以及大于100%作為三個分箱區間,首先我們用到上文介紹過的RowDrop丟掉那些成本或利潤為0的行,再用ApplyToRows來計算盈利率,最終使用Bin進行分箱:

  • 為電影盈利率進行數據分箱

pipeline = pdp.PdPipeline([pdp.RowDrop(conditions={'budget': lambda x: x == 0,'revenue': lambda x: x == 0},reduce='any'),pdp.ApplyToRows(func=lambda row: row['revenue'] / row['budget'] - 1,colname='rate of return',follow_column='budget'),pdp.Bin(bin_map={'rate of return': [0, 1]}, drop=False)]) pipeline(data).head(3)

對應的結果:

圖21

OneHotEncode:
  這個類用于為類別型變量創建啞變量(即獨熱處理),效果等價于pandas中的get_dummies,主要參數如下:

  • columns:str或list,用于指定需要進行啞變量處理的列名,默認為None,即對全部類別型變量進行啞變量處理

  • dummy_na:bool型,決定是否將缺失值也作為啞變量的一個類別進行輸出,默認為False即忽略缺失值

  • exclude_columns:list,當columns參數設置為None時,這個參數傳入的列名列表中指定的列將不進行啞變量處理,默認為None,即不對任何列進行排除

  • drop_first:bool型或str型,默認為True,這個參數是針對啞變量中類似這樣的情況:譬如有類別型變量性別{男性女性},那么實際上只需要產生一列0-1型啞變量即可表示原始變量的信息,即性別{男性女性}->男性{01},0代表不為男性即女性,1相反,而drop_dirst設置為False時,原始變量有幾個類別就對應幾個啞變量被創造;當設置為指定類別值時(譬如設置drop_first = '男性'),這個值對應的類別將不進行啞變量生成

  • drop:bool型,控制是否在生成啞變量之后刪除原始的類別型變量,默認為True即刪除

下面我們偽造包含啞變量的數據框:

# 偽造的數據框 df = pd.DataFrame({'a': ['x', 'y', 'z'],'b': ['i', 'j', 'q'] }) df 圖22

默認參數下執行OneHotEncode:

pdp.OneHotEncode().apply(df) 圖23

設置drop_first為False:

pdp.OneHotEncode(drop_first=False).apply(df) 圖23

2.2.3 text_stages

text_stages中包含了對數據框中文本型變量進行處理的若干類,下文只介紹其中我認為最有用的:

RegexReplace:
  這個類用于對文本型列進行基于正則表達式的內容替換,其主要參數如下:

  • columns:str型或list型,傳入要進行替換的單個或多個列名

  • pattern:str,傳入匹配替換內容的正則表達式

  • replace:str,傳入替換后的新字符串

  • result_columns:str或list,與columns參數一一對應的結果列名稱,當你想要自定義結果新列名稱時這個參數就變得非常有用,默認為None,即直接替換原始列

  • drop:bool型,用于決定是否刪除替換前的原始列,默認為True,即刪除原始列

下面是舉例演示:

  • 替換original_language中的'en'或'cn'為'英文/中文'

pdp.RegexReplace(columns='original_language',pattern='en|cn',replace='英文/中文').apply(data)['original_language'].unique()

結果如圖24:

圖24

2.3 組裝pipeline的幾種方式

上文中我們主要演示了單一pipeline部件工作時的細節,接下來我們來了解pdpipe中組裝pipeline的幾種方式:

2.3.1 PdPipeline

這是我們在2.1中舉例說明使用到的創建pipeline的方法,直接傳入由按順序的pipeline組件組成的列表便可生成所需pipeline,而除了直接將其視為函數直接傳入原始數據和一些輔助參數(如verbose控制是否打印過程)之外,還可以用類似scikit-learn中的fit_transform方法:

# 調用pipeline的fit_transform方法作用于data直接得到所需結果,并打印流程信息 first_pipeline.fit_transform(data, verbose=1) 圖25

2.3.2 make_pdpipeline

與PdpPipeline相似,make_pdpipeline不可以傳入pipeline組件形成的列表,只能把每個pipeline組件當成位置參數按順序傳入:

# 以make_pdpipeline將pipeline組件作為位置參數傳入的方式創建pipeline first_pipeline1 = pdp.make_pdpipeline(pdp.ColDrop("original_title"),pdp.ApplyByCols(columns=['title'], func=lambda x: x.lower()),pdp.RowDrop({'vote_average': lambda x: x <= 7, 'original_language': lambda x: x != 'en'}),pdp.ApplyByCols(columns=['genres'], func=lambda x: [item['name'] for item in eval(x)].__len__(), result_columns=['genres_num']),pdp.RowDrop({'genres_num': lambda x: x <= 5}))# 以pdp.PdPipeline傳入流程列表的方式創建pipeline first_pipeline2 = pdp.PdPipeline([pdp.ColDrop("original_title"),pdp.ApplyByCols(columns=['title'], func=lambda x: x.lower()),pdp.RowDrop({'vote_average': lambda x: x <= 7, 'original_language': lambda x: x != 'en'}),pdp.ApplyByCols(columns=['genres'], func=lambda x: [item['name'] for item in eval(x)].__len__(), result_columns=['genres_num']),pdp.RowDrop({'genres_num': lambda x: x <= 5})])# 比較兩種不同方式創建的pipeline產生結果是否相同 first_pipeline1.fit_transform(data) == first_pipeline2(data)

比較結果如圖26,兩種方式殊途同歸:

圖26

以上就是本文全部內容,如有筆誤望指出!

參考資料:

https://pdpipe.github.io/pdpipe/doc/pdpipe/
https://tirthajyoti.github.io/Notebooks/Pandas-pipeline-with-pdpipe

-END-

備注:公眾號菜單包含了整理了一本AI小抄非常適合在通勤路上用學習。

往期精彩回顧2019年公眾號文章精選適合初學者入門人工智能的路線及資料下載機器學習在線手冊深度學習在線手冊AI基礎下載(第一部分)備注:加入本站微信群或者qq群,請回復“加群”加入知識星球(4500+用戶,ID:92416895),請回復“知識星球”

喜歡文章,點個在看

總結

以上是生活随笔為你收集整理的案例 | 用pdpipe搭建pandas数据分析流水线的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。