日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python读取hive方案分析

發布時間:2023/12/31 python 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python读取hive方案分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

python讀取hive方案對比

引言

最近接到一項任務–開發python工具,方便從HDFS讀取文件和Hive表數據。當前網上的方案大多是通過第三方python包實現,只需導入指定pypi包即可完成,這種方案雖然在功能上具有可行性,但是當數據量級增大時,讀取數據效率低下,無法滿足業務場景需要,為此需調研其他方案實現python讀取Hive表功能。

方案分析

方案一(pyhive):

目前實驗場景下常見的方案使用pyhive,pyhive通過與HiveServer2通訊來操作Hive數據。當hiveserver2服務啟動后,會開啟10000的端口,對外提供服務,此時pyhive客戶端通過JDBC連接hiveserver2進行Hive sql操作。

Pyhive Client通過JDBC與hiveserver2建立通信,hiveserver2服務端發送HQL語句到Driver端,Driver端將HQL發送至Compiler組件進行語法樹解析,此時需在metastore獲取HQL相關的database和table等信息,在對HQL完成解析后,Compiler組件發送執行計劃至Driver端等待處理,Driver端發送執行計劃至Executor端,再由Executor端發送MapReduce任務至Hadoop集群執行Job,Job完成后最終將HQL查詢數據發送Driver端,再由hive server2返回數據至pyhive Client。

python讀取hive表的Demo:

from pyhive import hivedef read_jdbc(host, port, database: str, table: str, query_sql: str) -> DataFrame:# 1、連接hive服務端hive.Connection(host=host, port=10000, database=database)cursor = conn.cursor()logger.info('connect hive successfully.')# 2、執行hive sqlcursor.execute(query_sql)logger.info('query hive table successfully.')# 3、返回pandas.dataframetable_len = len(table) + 1columns = [col[0] for col in cursor.description]col = list(map(lambda x: x[table_len:], columns))result = cursor.fetchall()return pd.DataFrame(result, columns=col)

方案二(impyla):

目前還有用戶通過impyla訪問hive表,impyla通過與HiveServer2通訊來操作Hive數據。當hiveserver2服務啟動后,會開啟10000的端口,對外提供服務,此時impyla客戶端通過JDBC連接hiveserver2進行Hive sql操作。impyla與hive通信方式和大體相同,具體流程可以參考方案一流程圖。

python讀取hive表的Demo:

from impala.dbapi import connectdef read_jdbc(host, port, database: str, table: str, query_sql: str) -> DataFrame:# 1、連接hive服務端conn = connect(host=host, port=10000, database="test", auth_mechanism='PLAIN')cursor = conn.cursor()# 2、執行hive sqlcursor.execute(query_sql)logger.info('query hive table successfully.')# 3、返回pandas.dataframetable_len = len(table) + 1columns = [col[0] for col in cursor.description]col = list(map(lambda x: x[table_len:], columns))result = cursor.fetchall()return pd.DataFrame(result, columns=col)

方案三(pyarrow+thrift):

從方案一流程圖中可以了解到上述兩種方案都JDBC和服務端建立連接,客戶端和hiveserver2建立通信后,解析Hive sql并執行MapReduce的方式訪問Hive數據文件,當Hive數據量增大時,對數據進行MapReduce操作和數據之間的網絡傳輸會使得讀取數據面臨延遲高,效率低等問題。
分析上述方案我們可知,在Hadoop集群進行Mapreduce,查詢后結果數據經Driver、Executor和hiveserver2才可返回至Client,在數據量級增大的情況下,這些步驟無疑會成為制約python訪問hive的效率的因素,為了解決上述問題,我們采用直接讀取Hdfs存儲文件的方式獲取Hive數據的方式,規避上述問題。

  • hive metastore中存儲Hive創建的database、table、表的字段、存儲位置等元信息,在讀取HDFS文件之前,首先需通過thrift協議和hive metastore服務端建立連接,獲取元數據信息;
  • 為了解決數據快速增長和復雜化的情況下,大數據分析性能低下的問題,Apache Arrow應運而生,在讀取HDFS文件時采用pyarrow讀取hive數據文件的方式。

    為了在本地生成hive metastore服務端文件,首先在hive源碼中下載hive_metastore.thrift文件,在thrift源碼中下載fb303.thrift文件,其次執行以下命令。
thrift -gen py fb303.thrift thrift -gen py hive_metastore.thrift

執行后可以得到以下目錄文件

python向hive表中寫入數據和讀取hive表的Demo:

from hive_service import ThriftHive from hive_service.ttypes import HiveServerException from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol import subprocessfrom pyarrow.parquet import ParquetDataSet import pyarrow.parquet as pq import pyarrow as pa from libraries.hive_metastore.ThriftHiveMetastore import Clientdef connect_hive() -> Client:"""通過thrift連接hive metastore服務端"""transport = TSocket.TSocket(host, int(port))transport = TTransport.TBufferedTransport(transport)transport.open()protocol = TBinaryProtocol.TBinaryProtocol(transport)return ThriftHiveMetastore.Client(protocol)def write_table(client: Client, database: str, table: str, dataframe: DataFrame, partitions: list = None):"""提供給用戶將dataFrame寫入hive表中的方式Examples:client = connect_hive(host, port)df = pd.DataFrame({'index': [1, 2, 3],'name': ['xiaoming', 'xiaowang', 'xiaozhang'],'prt_dt': ['2020', '2019', '2020']})partition_cols = ['prt_dt']write_table(client, database, table, df, partition_cols)Args:client(Client):hive客戶端,通過thrift協議訪問hive metastoredatabase(str):數據庫table(str):表名dataframe(pandas.DataFrame):pandas.DataFramepartitions(list):分區信息raise:HiveDatabaseNOTEXIST:Hive庫不存在時拋出異常HiveTableNOTEXIST:Hive表不存在時拋出異常"""# 1、連接hive服務端client = connect_hive(host, port)# 2、檢查數據庫是否存在,如果不存在則拋出異常databases = client.get_all_databases()if database not in databases:raise HiveDatabaseNOTEXIST('Hive database is not exist.')# 3、創建hive表,如果表名重復則拋出異常tables = client.get_all_tables(database)if table not in tables:raise HiveTableNOTEXIST('Hive table is not exist.')# 4、將pandas中字段int64類型轉為intcolumns = dataframe.columnsint64_fields = {}float64_fields = {}for field in columns:if pd.api.types.is_int64_dtype(dataframe[field]):int64_fields[field] = 'int32'if pd.api.types.is_float_dtype(dataframe[field]):float64_fields[field] = 'float32'transfer_fields = dict(int64_fields, **float64_fields)transfer_df = dataframe.astype(transfer_fields)# 5、將dataframe寫入hive表中table_hdfs_path = client.get_table(database, table).sd.locationtable = pa.Table.from_pandas(transfer_df)pq.write_to_dataset(table=table, root_path=table_hdfs_path, partition_cols=partitions)# 6、寫入分區表時需刷新元數據信息(msck repair table ***)shell = "hive -e 'msck repair table {}' ".format('train_data.telecom_train')subprocess.Popen(shell,shell=True)def read_table(data_source: DataSource, database: str, table: str, partitions: list = None) -> DataFrame:"""提供給用戶根據hive庫名和表名訪問數據的方式-->dataframe(thrift、urllib、pyarrow、pyhdfs)Examples:client = connect_hive(host, port)read_table(client,'test','test')Args:client(Client):hive客戶端,通過thrift協議訪問hive metastoredatabase(str):hive庫名table(str):hive表名partitions(list):hive表分區(用戶需按照分區目錄填寫),如果查詢所有數據,則無需填寫分區Return:pandas.dataframe"""# 1、連接hive服務端client = connect_hive(host, port)# 2、查詢hive表元數據table = client.get_table(database, table)table_hdfs_path = table.sd.locationlogging.info('table_hdfs_path:' + table_hdfs_path)print(table_hdfs_path)# 3、判斷hive是否為分區表,當用戶沒有輸入partitions時需查找所有分區數據if partitions is not None:table_hdfs_path = [table_hdfs_path + constant.FILE_SEPARATION + x for x in partitions][0]dataframe = pq.ParquetDataset(table_hdfs_path).read().to_pandas()# pyarrow訪問分區目錄時,dataframe不含分區列,因此需添加分區列信息for partition in partitions:index = partition.find('=')field = partition[:index]field_value = partition[index + 1:]dataframe[field] = field_valueelse:dataframe = pq.ParquetDataset(table_hdfs_path).read().to_pandas()return dataframe

方案對比

為了驗證分析三種方案在讀取數據性能的差異,我們設置了對比實驗,準備27維數據,在數據量不斷遞增情況下執行SELECT查詢語句,我們可以得到如下折線圖。

方式在讀取效率上優于pyarrow+thrift方案,此后,隨著數據量級不斷增大,pyarrow+thrift方案較其他兩種方案有明顯優勢。在線下測試中我們發現,讀取百萬級數據時,pyhive和impyla需要大約4分鐘,而pyarrow+thrift只需20s。

結論

上一章節中,三種方案在讀取同一數據時性能上的差異,可以清楚知道數據量在3w左右時,三種方案在讀取數據性能上的表現相差不大,但當數據量級不斷增大時,通過pyarrow+thrift方案在讀取性能上明顯優于前兩種方案。因此,在萬級數據以上推薦使用pyarrow+thrift方式訪問Hive數據,可以極大提高python讀取hive數據的效率。

總結

以上是生活随笔為你收集整理的python读取hive方案分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。