日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Spark(二十):Spark Core外部数据源引入

發布時間:2023/11/28 生活经验 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Spark(二十):Spark Core外部数据源引入 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

外部數據源

MySQL 數據源

演示代碼

HBase 數據源

HBase Sink

???????HBase Source


外部數據源

?

Spark可以從外部存儲系統讀取數據,比如RDBMs表中或者HBase表中讀寫數據,這也是企業中常常使用,如:

?1)、要分析的數據存儲在HBase表中,需要從其中讀取數據數據分析

日志數據:電商網站的商家操作日志

訂單數據:保險行業訂單數據

?2)、使用Spark進行離線分析以后,往往將報表結果保存到MySQL表中

網站基本分析(pv、uv。。。。。)

注意:實際開發中會封裝為工具類直接使用

https://github.com/teeyog/blog/issues/22

https://blog.csdn.net/u011817217/article/details/81667115

?

?

?

MySQL 數據源

?????實際開發中常常將分析結果RDD保存至MySQL表中,使用foreachPartition函數;此外Spark中提供JdbcRDD用于從MySQL表中讀取數據。

調用RDD#foreachPartition函數將每個分區數據保存至MySQL表中,保存時考慮降低RDD分區數目和批量插入,提升程序性能。

演示代碼

package cn.itcast.coreimport java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.{JdbcRDD, RDD}/*** Author itcast* Desc 演示使用Spark將數據寫入到MySQL,再從MySQL讀取出來*/
object SparkJdbcDataSource {def main(args: Array[String]): Unit = {//1.創建SparkContextval sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")//2.準備數據val data: RDD[(String, Int)] = sc.parallelize(List(("jack", 18), ("tom", 19), ("rose", 20)))//3.將RDD中的數據保存到MySQL中去//將每一個分區中的數據保存到MySQL中去,有幾個分區,就會開啟關閉連接幾次//data.foreachPartition(itar=>dataToMySQL(itar))data.foreachPartition(dataToMySQL) //方法即函數,函數即對象//4.從MySQL讀取數據/*class JdbcRDD[T: ClassTag](sc: SparkContext,getConnection: () => Connection,sql: String,lowerBound: Long,upperBound: Long,numPartitions: Int,mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)*/val getConnection = ()=> DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")val sql:String = "select id,name,age from t_student where id >= ? and id <= ?"val mapRow = (rs:ResultSet) => {val id: Int = rs.getInt(1)val name: String = rs.getString(2)val age: Int = rs.getInt("age")(id,name,age)}val studentRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD(sc,getConnection,sql,4,5,2,mapRow)println(studentRDD.collect().toBuffer)}/*** 將分區中的數據保存到MySQL* @param itar 傳過來的每個分區有多條數據*/def dataToMySQL(itar: Iterator[(String, Int)]): Unit = {//0.加載驅動//Class.forName("") //源碼中已經加載了//1.獲取連接val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")//2.編寫sqlval sql:String = "INSERT INTO `t_student` (`name`, `age`) VALUES (?, ?);"//3.獲取psval ps: PreparedStatement = connection.prepareStatement(sql)itar.foreach(data=>{//4.設置參數ps.setString(1,data._1)ps.setInt(2,data._2)//5.執行sqlps.addBatch()})ps.executeBatch()ps.close()connection.close()}
}

?

???????HBase 數據源

Spark可以從HBase表中讀寫(Read/Write)數據,底層采用TableInputFormatTableOutputFormat方式,與MapReduce與HBase集成完全一樣,使用輸入格式InputFormat和輸出格式OutputFoamt。

?

?

???????HBase Sink

回顧MapReduce向HBase表中寫入數據,使用TableReducer,其中OutputFormat為TableOutputFormat,讀取數據Key:ImmutableBytesWritable(Rowkey),Value:Put(Put對象)。

寫入數據時,需要將RDD轉換為RDD[(ImmutableBytesWritable, Put)]類型,調用saveAsNewAPIHadoopFile方法數據保存至HBase表中。

HBase Client連接時,需要設置依賴Zookeeper地址相關信息及表的名稱,通過Configuration設置屬性值進行傳遞。

?

范例演示:將詞頻統計結果保存HBase表,表的設計

?

代碼如下:

package cn.itcast.coreimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 將RDD數據保存至HBase表中*/
object SparkWriteHBase {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 構建RDDval list = List(("hadoop", 234), ("spark", 3454), ("hive", 343434), ("ml", 8765))val outputRDD: RDD[(String, Int)] = sc.parallelize(list, numSlices = 2)// 將數據寫入到HBase表中, 使用saveAsNewAPIHadoopFile函數,要求RDD是(key, Value)// ?組裝RDD[(ImmutableBytesWritable, Put)]/*** HBase表的設計:* 表的名稱:htb_wordcount* Rowkey: ?word* 列簇: ???info* 字段名稱: count*/val putsRDD: RDD[(ImmutableBytesWritable, Put)] = outputRDD.mapPartitions { iter =>iter.map { case (word, count) =>// 創建Put實例對象val put = new Put(Bytes.toBytes(word))// 添加列put.addColumn(// 實際項目中使用HBase時,插入數據,先將所有字段的值轉為String,再使用Bytes轉換為字節數組Bytes.toBytes("info"), Bytes.toBytes("cout"), Bytes.toBytes(count.toString))// 返回二元組(new ImmutableBytesWritable(put.getRow), put)}}// 構建HBase Client配置信息val conf: Configuration = HBaseConfiguration.create()// 設置連接Zookeeper屬性conf.set("hbase.zookeeper.quorum", "node1")conf.set("hbase.zookeeper.property.clientPort", "2181")conf.set("zookeeper.znode.parent", "/hbase")// 設置將數據保存的HBase表的名稱conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount")/*def saveAsNewAPIHadoopFile(path: String,// 保存的路徑keyClass: Class[_], // Key類型valueClass: Class[_], // Value類型outputFormatClass: Class[_ <: NewOutputFormat[_, _]], // 輸出格式OutputFormat實現conf: Configuration = self.context.hadoopConfiguration // 配置信息): Unit*/putsRDD.saveAsNewAPIHadoopFile("datas/spark/htb-output-" + System.nanoTime(), //classOf[ImmutableBytesWritable], //classOf[Put], //classOf[TableOutputFormat[ImmutableBytesWritable]], //conf)// 應用程序運行結束,關閉資源sc.stop()}
}

運行完成以后,使用hbase shell查看數據:

?

?

???????HBase Source

回顧MapReduce從讀HBase表中的數據,使用TableMapper,其中InputFormat為TableInputFormat,讀取數據Key:ImmutableBytesWritable,Value:Result。

???從HBase表讀取數據時,同樣需要設置依賴Zookeeper地址信息和表的名稱,使用Configuration設置屬性,形式如下:

?

?????此外,讀取的數據封裝到RDD中,Key和Value類型分別為:ImmutableBytesWritable和Result,不支持Java Serializable導致處理數據時報序列化異常。設置Spark Application使用Kryo序列化,性能要比Java 序列化要好,創建SparkConf對象設置相關屬性,如下所示:

?

范例演示:從HBase表讀取詞頻統計結果,代碼如下

package cn.itcast.coreimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 從HBase 表中讀取數據,封裝到RDD數據集*/
object SparkReadHBase {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 讀取HBase Client 配置信息val conf: Configuration = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum", "node1")conf.set("hbase.zookeeper.property.clientPort", "2181")conf.set("zookeeper.znode.parent", "/hbase")// 設置讀取的表的名稱conf.set(TableInputFormat.INPUT_TABLE, "htb_wordcount")/*def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](conf: Configuration = hadoopConfiguration,fClass: Class[F],kClass: Class[K],vClass: Class[V]): RDD[(K, V)]*/val resultRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])println(s"Count = ${resultRDD.count()}")resultRDD.take(5).foreach { case (rowKey, result) =>println(s"RowKey = ${Bytes.toString(rowKey.get())}")// HBase表中的每條數據封裝在result對象中,解析獲取每列的值result.rawCells().foreach { cell =>val cf = Bytes.toString(CellUtil.cloneFamily(cell))val column = Bytes.toString(CellUtil.cloneQualifier(cell))val value = Bytes.toString(CellUtil.cloneValue(cell))val version = cell.getTimestampprintln(s"\t $cf:$column?= $value, version = $version")}}// 應用程序運行結束,關閉資源sc.stop()}
}

運行結果:

?

?

總結

以上是生活随笔為你收集整理的2021年大数据Spark(二十):Spark Core外部数据源引入的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。