日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark hbase

發布時間:2025/3/21 编程问答 16 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark hbase 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1 配置

1.1 開發環境:

  • HBase:hbase-1.0.0-cdh5.4.5.tar.gz
  • Hadoop:hadoop-2.6.0-cdh5.4.5.tar.gz
  • ZooKeeper:zookeeper-3.4.5-cdh5.4.5.tar.gz
  • Spark:spark-2.1.0-bin-hadoop2.6

1.2 Spark的配置

  • Jar包:需要HBase的Jar如下(經過測試,正常運行,但是是否存在冗余的Jar并未證實,若發現多余的jar可自行進行刪除)

  • spark-env.sh
    添加以下配置:export SPARK_CLASSPATH=/home/hadoop/data/lib1/*
    注:如果使用spark-shell的yarn模式進行測試的話,那么最好每個NodeManager節點都有配置jars和hbase-site.xml
  • spark-default.sh
spark.yarn.historyServer.address=slave11:18080 spark.history.ui.port=18080 spark.eventLog.enabled=true spark.eventLog.dir=hdfs:///tmp/spark/events spark.history.fs.logDirectory=hdfs:///tmp/spark/events spark.driver.memory=1g spark.serializer=org.apache.spark.serializer.KryoSerializer

1.3 數據

1)格式: barCode@item@value@standardValue@upperLimit@lowerLimit

01055HAXMTXG10100001@KEY_VOLTAGE_TEC_PWR@1.60@1.62@1.75@1.55
01055HAXMTXG10100001@KEY_VOLTAGE_T_C_PWR@1.22@1.24@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_T_BC_PWR@1.16@1.25@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_11@1.32@1.25@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_T_RC_PWR@1.24@1.25@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_T_VCC_5V@1.93@1.90@1.95@1.65
01055HAXMTXG10100001@KEY_VOLTAGE_T_VDD3V3@1.59@1.62@1.75@1.55

2 代碼演示

2.1 準備動作

1)既然是與HBase相關,那么首先需要使用hbase shell來創建一個表

創建表格:create ‘data’,’v’,create ‘data1’,’v’

2)使用spark-shell進行操作,命令如下:

bin/spark-shell --master yarn --deploy-mode client --num-executors 5 --executor-memory 1g --executor-cores 2

3)import 各種類

import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.client.Get import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64,Bytes} import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.commons.codec.digest.DigestUtils

2.2 代碼實戰

創建conf和table

val conf= HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE,"data1") val table = new HTable(conf,"data1")

2.2.1 數據寫入

格式:

val put = new Put(Bytes.toBytes("rowKey")) put.add("cf","q","value")

使用for來插入5條數據

for(i <- 1 to 5){ var put= new Put(Bytes.toBytes("row"+i));put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes("value"+i));table.put(put)}

到hbase shell中查看結果

2.2.2 數據讀取

val hbaseRdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])

1)take

hbaseRdd take 1

2)scan

var scan = new Scan(); scan.addFamily(Bytes.toBytes(“v”)); var proto = ProtobufUtil.toScan(scan) var scanToString = Base64.encodeBytes(proto.toByteArray()); conf.set(TableInputFormat.SCAN,scanToString)val datas = hbaseRdd.map( x=>x._2).map{result => (result.getRow,result.getValue(Bytes.toBytes("v"),Bytes.toBytes("value")))}.map(row => (new String(row._1),new String(row._2))).collect.foreach(r => (println(r._1+":"+r._2)))

2.3 批量插入

2.3.1 普通插入

1)代碼

val rdd = sc.textFile("/data/produce/2015/2015-03-01.log") val data = rdd.map(_.split("@")).map{x=>(x(0)+x(1),x(2))} val result = data.foreachPartition{x => {val conf= HBaseConfiguration.create();conf.set(TableInputFormat.INPUT_TABLE,"data");conf.set("hbase.zookeeper.quorum","slave5,slave6,slave7");conf.set("hbase.zookeeper.property.clientPort","2181");conf.addResource("/home/hadoop/data/lib/hbase-site.xml");val table = new HTable(conf,"data");table.setAutoFlush(false,false);table.setWriteBufferSize(3*1024*1024); x.foreach{y => { var put= new Put(Bytes.toBytes(y._1));put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(y._2));table.put(put)};table.flushCommits}}}

2)執行時間如下:7.6 min

2.3.2 Bulkload

1) 代碼:

val conf = HBaseConfiguration.create(); val tableName = "data1" val table = new HTable(conf,tableName) conf.set(TableOutputFormat.OUTPUT_TABLE,tableName) lazy val job = Job.getInstance(conf) job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) job.setMapOutputValueClass(classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad(job,table) val rdd = sc.textFile("/data/produce/2015/2015-03-01.log").map(_.split("@")).map{x => (DigestUtils.md5Hex(x(0)+x(1)).substring(0,3)+x(0)+x(1),x(2))}.sortBy(x =>x._1).map{x=>{val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));(new ImmutableBytesWritable(kv.getKey),kv)}} rdd.saveAsNewAPIHadoopFile("/tmp/data1",classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat],job.getConfiguration()) val bulkLoader = new LoadIncrementalHFiles(conf) bulkLoader.doBulkLoad(new Path("/tmp/data1"),table)

2) 執行時間:7s


3)執行結果:
到hbase shell 中查看 list “data1”

通過對比我們可以發現bulkload批量導入所用時間遠遠少于普通導入,速度提升了60多倍,當然我沒有使用更大的數據量測試,但是我相信導入速度的提升是非常顯著的,強烈建議使用BulkLoad批量導入數據到HBase中。

轉載于:https://www.cnblogs.com/aibabel/p/10952247.html

總結

以上是生活随笔為你收集整理的spark hbase的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。