Spark 连接 HBase 入库及查询操作
生活随笔
收集整理的這篇文章主要介紹了
Spark 连接 HBase 入库及查询操作
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
本實例采用Scala開發,實現了RDD數據兩種方式入庫到HBase,從HBase中讀取數據并print輸出。
build.sbt
?
name := "SparkSbt"version := "0.1"scalaVersion := "2.10.5"libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0"libraryDependencies+="org.apache.hbase"%"hbase-common"%"1.2.0" libraryDependencies+="org.apache.hbase"%"hbase-client"%"1.2.0" libraryDependencies+="org.apache.hbase"%"hbase-server"%"1.2.0"?
先hbase shell執行命令創建表:
create 'account' , 'cf'
create 'account2' , 'cf'
?
源碼
package com.whq.testimport org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark._object HBaseTest {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("HBaseTest")val sc = new SparkContext(sparkConf)// please ensure HBASE_CONF_DIR is on classpath of spark driverval conf = HBaseConfiguration.create()//設置zooKeeper集群地址,也可以通過將hbase-site.xml導入classpath,但是建議在程序里這樣設置conf.set("hbase.zookeeper.quorum","192.168.91.144")conf.set("hbase.zookeeper.property.clientPort", "2181")入庫方式一saveAsHadoopDatasetprintln("————————————入庫方式一")var tablename = "account"conf.set(TableInputFormat.INPUT_TABLE, tablename)//初始化jobconf,TableOutputFormat必須是org.apache.hadoop.hbase.mapred包下的!val jobConf = new JobConf(conf)jobConf.setOutputFormat(classOf[TableOutputFormat])jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)//待入庫數據val indataRDD = sc.makeRDD(Array("11,whq,30","12,wanghongqi,29","13,xiaoming,15"))//數據轉換為可入庫的RDD[(ImmutableBytesWritable,Put)]val rdd = indataRDD.map(_.split(',')).map{arr=>{/*一個Put對象就是一行記錄,在構造方法中指定主鍵* 所有插入的數據必須用org.apache.hadoop.hbase.util.Bytes.toBytes方法轉換* Put.add方法接收三個參數:列族,列名,數據*/val put = new Put(Bytes.toBytes(arr(0).toInt))put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))//轉化成RDD[(ImmutableBytesWritable,Put)]類型才能調用saveAsHadoopDataset(new ImmutableBytesWritable, put)}}//入庫寫入rdd.saveAsHadoopDataset(jobConf)入庫方式二saveAsNewAPIHadoopDatasetprintln("————————————入庫方式二")tablename = "account2"conf.set(TableOutputFormat.OUTPUT_TABLE, tablename)val job2 = Job.getInstance(conf)job2.setOutputKeyClass(classOf[ImmutableBytesWritable])job2.setOutputValueClass(classOf[Result])job2.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])val rdd2 = indataRDD.map(_.split(',')).map{arr=>{val put = new Put(Bytes.toBytes(arr(0)))put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))(new ImmutableBytesWritable, put)}}rdd2.saveAsNewAPIHadoopDataset(job2.getConfiguration())讀取數據println("————————————讀取數據")conf.set(TableInputFormat.INPUT_TABLE, tablename)//讀取數據并轉化成rddval hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val count = hBaseRDD.count()println(count)hBaseRDD.collect().foreach{case (_,result) =>{//獲取行鍵val key = Bytes.toString(result.getRow)//通過列族和列名獲取列val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))println("Row key:"+key+" Name:"+name+" Age:"+age)}}sc.stop()} }執行命令
spark-submit --master yarn --deploy-mode client --class com.whq.test.HBaseTest sparksbt_2.10-0.1.jar
?
查看數據情況
scan 'account'
scan 'account2'
?
總結
以上是生活随笔為你收集整理的Spark 连接 HBase 入库及查询操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jq关于对象类型的判断
- 下一篇: 黑苹果MacOS Big Sur 11.