日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

sparksql dataframe变成csv保存_Spark大数据分析(三):DataFrame和SQL

發布時間:2025/3/15 数据库 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 sparksql dataframe变成csv保存_Spark大数据分析(三):DataFrame和SQL 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
首發于公眾號“大數據風控與機器學習”。

Spark SQL 是 Spark 處理結構化數據的一個模塊, 與基礎的 Spark RDD API 不同, Spark SQL 提供了查詢結構化數據及計算結果等信息的接口.在內部, Spark SQL 使用這個額外的信息去執行額外的優化.有幾種方式可以跟 Spark SQL 進行交互, 包括 SQL 和 Dataset API.當使用相同執行引擎進行計算時, 無論使用哪種 API / 語言都可以快速的計算。

SQL

Spark SQL 的功能之一是執行 SQL 查詢,Spark SQL 也能夠被用于從已存在的 Hive 環境中讀取數據。當以另外的編程語言運行SQL 時, 查詢結果將以 Dataset/DataFrame的形式返回,也可以使用 命令行或者通過 JDBC/ODBC與 SQL 接口交互.

DataFrames

從RDD里可以生成類似大家在pandas中的DataFrame,同時可以方便地在上面完成各種操作。

1.構建SparkSession

Spark SQL中所有功能的入口點是 SparkSession 類. 要創建一個 SparkSession, 僅使用 SparkSession.builder()就可以了:

from

2.創建 DataFrames

在一個 SparkSession中, 應用程序可以從一個 已經存在的 RDD 或者 hive表, 或者從Spark數據源中創建一個DataFrames.

舉個例子, 下面就是基于一個JSON文件創建一個DataFrame:

df = spark.read.json("data/people.json") df.show()#必須使用show()不然不會打印

3.DataFrame 操作

DataFrames 提供了一個特定的語法用在 Scala, Java, Python and R中機構化數據的操作。

在Python中,可以通過(df.age) 或者(df['age'])來獲取DataFrame的列. 雖然前者便于交互式操作, 但是還是建議用戶使用后者, 這樣不會破壞列名,也能引用DataFrame的類。

通過以下操作進行select

#查看字段屬性 df.printSchema()

root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

df.select("name").show()

df.select(["name",'age']).show()

df.select(df['name'], df['age'] + 1).show()

以下操作的filter做條件過濾

df.filter(df['age'] > 21).show()

df.groupBy("age").count().show()

還可以創建視圖,然后使用SQL語句進行處理。得到的也是dataframe。

df.createOrReplaceTempView("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show()

spark DataFrame與RDD交互

Spark SQL 支持兩種不同的方法用于轉換已存在的 RDD 成為 Dataset

第一種方法是使用反射去推斷一個包含指定的對象類型的 RDD 的 Schema.在你的 Spark 應用程序中當你已知 Schema 時這個基于方法的反射可以讓你的代碼更簡潔.

第二種用于創建 Dataset 的方法是通過一個允許你構造一個 Schema 然后把它應用到一個已存在的 RDD 的編程接口.然而這種方法更繁瑣, 當列和它們的類型知道運行時都是未知時它允許你去構造 Dataset.

當數據不規整,無法像csv或者excel等文件一樣直接讀取時,可以通過如下形式自定義dataframe樣式。

from pyspark.sql import Rowsc = spark.sparkContext lines = sc.textFile("data/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))# Infer the schema, and register the DataFrame as a table. schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") type(teenagers)

pyspark.sql.dataframe.DataFrame

type(teenagers.rdd)

pyspark.rdd.RDD

teenagers.rdd.first()

Row(name='Justin')

teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() for name in teenNames:print(name)

Name: Justi

以編程的方式指定Schema

也可以通過以下的方式去初始化一個 DataFrame。

  • RDD從原始的RDD創建一個RDD的toples或者一個列表;
  • Step 1 被創建后, 創建 Schema 表示一個 StructType 匹配 RDD 中的結構.
  • 通過 SparkSession 提供的 createDataFrame 方法應用 Schema 到 RDD .
from pyspark.sql.types import *sc = spark.sparkContext# Load a text file and convert each line to a Row. lines = sc.textFile("data/people.txt") parts = lines.map(lambda l: l.split(",")) # Each line is converted to a tuple. people = parts.map(lambda p: (p[0], p[1].strip()))# The schema is encoded in a string. schemaString = "name age"fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields)# Apply the schema to the RDD. schemaPeople = spark.createDataFrame(people, schema)schemaPeople.createOrReplaceTempView("people") results = spark.sql("SELECT name FROM people") results.show()

總結

以上是生活随笔為你收集整理的sparksql dataframe变成csv保存_Spark大数据分析(三):DataFrame和SQL的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。