日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Spark SQL 之SQLContext(二)

發布時間:2024/1/23 数据库 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark SQL 之SQLContext(二) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. SQLContext的創建

SQLContext是Spark SQL進行結構化數據處理的入口,可以通過它進行DataFrame的創建及SQL的執行,其創建方式如下:

//sc為SparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  • 1

其對應的源碼為:

def this(sparkContext: SparkContext) = {this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true)}

其調用的是私有的主構造函數:

//1.主構造器中的參數CacheManager用于緩存查詢結果 //在進行后續查詢時會自動讀取緩存中的數據 //2.SQLListener用于監聽Spark scheduler事件,它繼承自SparkListener //3.isRootContext表示是否是根SQLContext class SQLContext private[sql](@transient val sparkContext: SparkContext,@transient protected[sql] val cacheManager: CacheManager,@transient private[sql] val listener: SQLListener,val isRootContext: Boolean)extends org.apache.spark.Logging with Serializable {
  • 1

當spark.sql.allowMultipleContexts設置為true時,則允許創建多個SQLContexts/HiveContexts,創建方法為newSession

def newSession(): SQLContext = {new SQLContext(sparkContext = sparkContext,cacheManager = cacheManager,listener = listener,isRootContext = false)}
  • 1

其isRootContext 被設置為false,否則會拋出異常,因為root SQLContext只能有一個,其它SQLContext與root SQLContext共享SparkContext, CacheManager, SQLListener。如果spark.sql.allowMultipleContexts為false,則只允許一個SQLContext存在

2. 核心成員變量 ——catalog

protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)

catalog用于注銷表、注銷表、判斷表是否存在等,例如當DataFrame調用registerTempTable 方法時

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people")

會sqlContext的registerDataFrameAsTable方法

def registerTempTable(tableName: String): Unit = {sqlContext.registerDataFrameAsTable(this, tableName)}
  • 1

sqlContext.registerDataFrameAsTable實質上調用的就是catalog的registerTable 方法:

private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {catalog.registerTable(TableIdentifier(tableName), df.logicalPlan)}
  • 1

SimpleCatalog整體源碼如下:

class SimpleCatalog(val conf: CatalystConf) extends Catalog {private[this] val tables = new ConcurrentHashMap[String, LogicalPlan]override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {tables.put(getTableName(tableIdent), plan)}override def unregisterTable(tableIdent: TableIdentifier): Unit = {tables.remove(getTableName(tableIdent))}override def unregisterAllTables(): Unit = {tables.clear()}override def tableExists(tableIdent: TableIdentifier): Boolean = {tables.containsKey(getTableName(tableIdent))}override def lookupRelation(tableIdent: TableIdentifier,alias: Option[String] = None): LogicalPlan = {val tableName = getTableName(tableIdent)val table = tables.get(tableName)if (table == null) {throw new NoSuchTableException}val tableWithQualifiers = Subquery(tableName, table)// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are// properly qualified with this alias.alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)}override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {tables.keySet().asScala.map(_ -> true).toSeq}override def refreshTable(tableIdent: TableIdentifier): Unit = {throw new UnsupportedOperationException} }

3. 核心成員變量 ——sqlParser

sqlParser在SQLContext的定義:

protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))

SparkSQLParser為頂級的Spark SQL解析器,對Spark SQL支持的SQL語法進行解析,其定義如下:

private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser

fallback函數用于解析其它非Spark SQL Dialect的語法。
Spark SQL Dialect支持的關鍵字包括:

protected val AS = Keyword("AS")protected val CACHE = Keyword("CACHE")protected val CLEAR = Keyword("CLEAR")protected val DESCRIBE = Keyword("DESCRIBE")protected val EXTENDED = Keyword("EXTENDED")protected val FUNCTION = Keyword("FUNCTION")protected val FUNCTIONS = Keyword("FUNCTIONS")protected val IN = Keyword("IN")protected val LAZY = Keyword("LAZY")protected val SET = Keyword("SET")protected val SHOW = Keyword("SHOW")protected val TABLE = Keyword("TABLE")protected val TABLES = Keyword("TABLES")protected val UNCACHE = Keyword("UNCACHE")
  • 3

4. 核心成員變量 ——ddlParser

用于解析DDL(Data Definition Language 數據定義語言)

protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))

其支持的關鍵字有:

protected val CREATE = Keyword("CREATE")protected val TEMPORARY = Keyword("TEMPORARY")protected val TABLE = Keyword("TABLE")protected val IF = Keyword("IF")protected val NOT = Keyword("NOT")protected val EXISTS = Keyword("EXISTS")protected val USING = Keyword("USING")protected val OPTIONS = Keyword("OPTIONS")protected val DESCRIBE = Keyword("DESCRIBE")protected val EXTENDED = Keyword("EXTENDED")protected val AS = Keyword("AS")protected val COMMENT = Keyword("COMMENT")protected val REFRESH = Keyword("REFRESH")
  • 1

主要做三件事,分別是創建表、描述表和更新表

protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable

createTable方法具有如下(具體功能參考注釋說明):

/*** `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]* USING org.apache.spark.sql.avro* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`* or* `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]* USING org.apache.spark.sql.avro* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`* or* `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]* USING org.apache.spark.sql.avro* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`* AS SELECT ...*/protected lazy val createTable: Parser[LogicalPlan] = {// TODO: Support database.table.(CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ tableIdentifier ~tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {case temp ~ allowExisting ~ tableIdent ~ columns ~ provider ~ opts ~ query =>if (temp.isDefined && allowExisting.isDefined) {throw new DDLException("a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")}val options = opts.getOrElse(Map.empty[String, String])if (query.isDefined) {if (columns.isDefined) {throw new DDLException("a CREATE TABLE AS SELECT statement does not allow column definitions.")}// When IF NOT EXISTS clause appears in the query, the save mode will be ignore.val mode = if (allowExisting.isDefined) {SaveMode.Ignore} else if (temp.isDefined) {SaveMode.Overwrite} else {SaveMode.ErrorIfExists}val queryPlan = parseQuery(query.get)CreateTableUsingAsSelect(tableIdent,provider,temp.isDefined,Array.empty[String],mode,options,queryPlan)} else {val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))CreateTableUsing(tableIdent,userSpecifiedSchema,provider,temp.isDefined,options,allowExisting.isDefined,managedIfNoPath = false)}}}
  • 1

describeTable及refreshTable代碼如下:

/** describe [extended] table avroTable* This will display all columns of table `avroTable` includes column_name,column_type,comment*/protected lazy val describeTable: Parser[LogicalPlan] =(DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ {case e ~ tableIdent =>DescribeCommand(UnresolvedRelation(tableIdent, None), e.isDefined)}protected lazy val refreshTable: Parser[LogicalPlan] =REFRESH ~> TABLE ~> tableIdentifier ^^ {case tableIndet =>RefreshTable(tableIndet)} 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的Spark SQL 之SQLContext(二)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。