日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

HBase数据大批量导入方式总结和对比

發布時間:2023/12/20 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 HBase数据大批量导入方式总结和对比 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

HBase數據導入

1. 背景

  • 在實際生產中,海量數據一般都不是直接存儲在HBase中,這時候就需要一個數據導入到HBase的步驟
  • 上一篇博客講述了可以通過java api的方式或者shell 客戶端方式導入或者創建數據,但這對于實際生產中海量數據導入來說,速度和效率都太慢了,所以我們需要使用其他方式來解決海量輸入導入到HBase的問題
  • 利用HBase底層文件是HFile形式存儲再HDFS中,所以如果能夠直接生成HFile的話,這時候再讓HBase從HFile中讀取數據,就會快很多。
  • 2. 批量數據導入方式

  • shell 腳本命令,使用工具將HFile直接導入到HBase中
  • 編寫mapreduce程序,生成Hfile。然后再使用上shell方式,將HFile文件導入到HBase中
  • 編寫mapreduce程序,直接將文件中數據寫入到Hbase中
  • 將數據從HBase中一個地方(namespace、table)導入到HBase另外一個地方(namespace、table)
  • 3.數據導入步驟

    3.1 shell 腳本命令,使用工具將HFile直接導入到HBase中

    3.1.1 將csv文件轉為HFile

  • 數據準備(數據形式需要是csv格式的數據)

    在windows或者linux節點服務器上準備好,然后通過hdfs dfs -put指令上傳到hdfs文件系統中。這里選擇在linux系統上創建,可以省去在windows上創建后還要再從windows上傳到linux節點服務器的步驟
    文件名這里是:bulkload.csv
  • 1,lenovo,black,3000,intel 2,lenovo,black,43000,intel 3,xiaomi,black,3500,amd 4,lenovo,black,3000,amd 5,huawei,black,3000,amd 6,xiaomi,black,3000,arm 7,xiaomi,black,3000,arm 8,lenovo,black,6000,arm 9,huawei,black,5000,intel 10,huawei,black,3000,intel 11,huawei,black,7000,intel 12,huawei,black,3000,intel 13,hp,black,3000,amd 14,dell,black,3000,amd 15,huawei,black,3000,amd 16,apple,silver,544000,arm 17,mechanic,black,3000,arm 18,mechnic,black,3000,amd 19,hasee,black,12000,amd 20,hasee,black,23000,amd 21,hp,black,3000,intel 22,acer,black,3000,intel 23,acer,black,33000,intel 24,dell,black,30040,intel 25,dell,black,3000,intel 26,huawei,white,3000,amd 27,founder,blue,30060,intel 28,huawei,pink,3000,intel 29,huawei,black,30300,intel 30,huawei,black,3000,arm 31,huawei,black,33000,arm 32,lenovo,black,30200,arm 33,huawei,black,33000,intel 34,lenovo,black,3000,intel 35,huawei,black,30500,intel 36,lenovo,black,3000,intel
  • 在hdfs上創建輸入文件的文件夾
  • hdfs dfs -mkdir -p /csv/input
  • 將linux節點服務器上的csv文件上傳到hdfs的輸入文件文件夾中
  • hdfs dfs -put bulkload.csv /csv/input-- 上傳之后,輸入以下shell命令查看文件 hdfs dfs -ls /csv/input

    文件上傳后,查看上傳結果

    4. 在hdfs集群節點服務器上執行以下shell命令

    • 指定的shell命令
    hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \ -Dimporttsv.separator=, \ -Dimporttsv.columns='HBASE_ROW_KEY,cf1:brand,cf1:color,cf1:price,cf2:cpu_brand' \ -Dimporttsv.bulk.output=/csv/output \ doit:tb_computer_info \ /csv/input# org.apache.hadoop.hbase.mapreduce.ImportTsv這個是進行導入數據的工具類的類型,類似于javca執行一個jar包時,需要指定一個main方法的類的全類名 # Dimporttsv.separator=, 這里是csv文件的分割符號,csv文件按照百度百科標準,一般都是以逗號進行分割,但實際可以使用其他分隔符號 # Dimporttsv.columns='HBASE_ROW_KEY,cf1:brand,cf1:color,cf1:price,cf2:cpu_brand' 這里就是將csv文件中一行數據的每個字段對應在hbase的表中的字段進行指定,使用逗號隔開,例如第一個就是HBASE_ROW_KEY,第二個就是cf1:brand(就是列族cf1下的brand字段) # doit:tb_computer_info這是hbase中的哪個表,如果需要指定namespace,前面加上namespace名再加上冒號。不指定就默認是default這個namespace中的表。注意這個表需在執行shell命令前建立好 # /csv/input這個是輸入數據源路徑 # -Dimporttsv.bulk.output=/csv/output 這個是輸出數據源路徑 # \ 反斜杠是因為shell命令一行太長,使用反斜杠進行分割
    • Dimporttsv的參數說明

    -Dimporttsv.skip.bad.lines=false - 若遇到無效行則失敗
    -Dimporttsv.separator=, - 使用特定分隔符,默認是tab也就是\t
    -Dimporttsv.timestamp=currentTimeAsLong - 使用導入時的時間戳
    -Dimporttsv.mapper.class=my.Mapper - 使用用戶自定義Mapper類替換TsvImporterMapper
    -Dmapreduce.job.name=jobName - 對導入使用特定mapreduce作業名
    -Dcreate.table=no - 避免創建表,注:如設為為no,目標表必須存在于HBase中
    -Dno.strict=true - 忽略HBase表列族檢查。默認為false
    -Dimporttsv.bulk.output=/user/yarn/output 作業的輸出目錄

    • 輸入指令后,開始執行,按下enter鍵執行
    • 執行過程日志,可以看出其實就是執行了一個mapreduce的程序



    • 查看輸出的文件
      注意這里按照列族數量,生成了2個文件夾,和hbase存儲在hdfs中的文件規則是一致的。

      注意點擊進去cf1文件夾查看,可以看到有一個文件,這就是生成的hfile文件

    3.1.2 將HFile導入到HBase中

    hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /csv/output doit:tb_computer_info
    • 輸入指令后,可以看到如下日志,可以看出這時候就是HBase內部的程序,而不是mapreduce
    • 查看hbase中對應表格中數據

    3.2 使用mapreduce方式導入hbase

    3.2.1 環境準備

  • pom文件
  • <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties> <dependencies> <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.6</version> </dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>3.2.1</version> </dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>compile</scope> </dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.2.5</version> </dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.2.1</version> </dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.2.1</version> </dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>2.2.5</version> </dependency><!-- 使用mr程序操作hbase 數據的導入 --> <dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-mapreduce</artifactId><version>2.2.5</version> </dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.5</version> </dependency><!-- phoenix 鳳凰 用來整合Hbase的工具 --> <dependency><groupId>org.apache.phoenix</groupId><artifactId>phoenix-core</artifactId><version>5.0.0-HBase-2.0</version> </dependency></dependencies><build> <plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><!-- bind to the packaging phase --><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins> </build>
  • 集群環境準備
    • hdfs集群 hdfs
    • zookeeper集群 zk
    • hbase集群 hbase
      注意,需要都啟動,啟動順序,先啟動hdfs,然后是zookeeper,然后是hbase
  • windows10,安裝好 idea 2020版本和jdk 1.8 JDK8
  • maven環境 maven
  • 數據準備, 這里手動制造一些json數據,一行一行存放。
  • {"movie":"2294","rate":"4","timeStamp":"978824291","uid":"1"} {"movie":"3186","rate":"4","timeStamp":"978300019","uid":"1"} {"movie":"1566","rate":"a","timeStamp":"978824330","uid":"1"} {"movie":"588","rate":"4","timeStamp":"978824268","uid":"1"} {"movie":"1907","rate":"4","timeStamp":"978824330","uid":"1"} {"movie":"783","rate":"4","timeStamp":"978824291","uid":"1"} {"movie":"1836","rate":"5","timeStamp":"978300172","uid":"1"} {"movie":"1022","rate":"5","timeStamp":"978300055","uid":"1"} {"movie":"2762","rate":"4","timeStamp":"978302091","uid":"1"} {"movie":"150","rate":"5","timeStamp":"978301777","uid":"1"}

    3.2.2 數據導入思路

  • 從外部導入數據到HBase中,由于HBase本身不支持多維度的數據查詢,所以需要進行HBase表數據字段和結構設計
  • 外部數據可能有錯誤或者缺失信息,需要考慮如何處理(數據預處理或者在mapreduce中處理都可以)
  • rowkey設計
    • 這里假定業務場景是根據movie id進行數據查詢,所以rowkey的設計需要把movie id放進去,
    • 同時movie id長度不一,這時候采取補位處理,讓所有movie id的長度一致。考慮到不損失其原本含義,采用前面補0,否則后面補0會損失id的原本含義。
    • 看數據,如果采用movie id作為rowkey,則數據會發生覆蓋,這時候如果把rate、時間戳、uid作為后綴加進去。uid和rate會有重復,但時間戳沒有,所以選擇時間戳作為后綴來確保rowkey的唯一性。這樣一條數據對應hbase中一條數據,不會發生數據覆蓋問題。
  • 數據分區考慮,注意這里的數據(手動造了100萬條左右),數據量比較大,為了合理存放這些數據,也就是相對均勻劃分到不同region server上,同時確保相關數據盡可能在一起。可以考慮在設計好的rowkey基礎上,看是否做分區處理。如果需要,可以執行SPLITS設置。本文這里暫不演示
  • 3.2.3 代碼實現

  • 因為數據是json格式,所以需要有一個java bean存放數據。這個java bean會參與mapreduce過程,所以需要遵守writable協議
  • map過程就是把一行一行文本轉換為一個一個的java bean,注意這里的rowkey需要
  • reduce就是把map階段產生的key、value進行輸出,寫入到hdfs的文件中或者直接寫入hbase中
    • mapper
    class HBaseDataImportMapper extends Mapper<LongWritable, Text, Text, MovieBean> {// 數據解析使用GSONGson gson = new Gson();Text rowkeyText = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {try{// 讀取一行一行的數據String line = value.toString();// 使用GSON進行解析MovieBean movieBean = gson.fromJson(line, MovieBean.class);// 拼接處理rowkeyString movie = movieBean.getMovie();String timeStamp = movieBean.getTimeStamp();String rowkey = StringUtils.leftPad(movie, 5, '0') + "_" + timeStamp;// 這里復用Text對象rowkeyText.set(rowkey);// 輸出context.write(rowkeyText, movieBean);}catch (Exception e) {e.printStackTrace();}} }
    • reducer
    class HBaseDataImportReducer extends TableReducer<Text, MovieBean, ImmutableBytesWritable> {@Overrideprotected void reduce(Text key, Iterable<MovieBean> values, Context context) throws IOException, InterruptedException {try {String rowkey = key.toString();for (MovieBean value : values) {// 獲取java bean中的屬性String movie = value.getMovie();double rate = value.getRate();String timeStamp = value.getTimeStamp();String uid = value.getUid();// 構建一個HBase數據操作的Put對象// 傳入rowkey,這是一行的rowkeyPut put = new Put(Bytes.toBytes(rowkey));// 設置列族、字段等信息put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("movie"), Bytes.toBytes(movie));put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("rate"), Bytes.toBytes(rate));put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("timeStamp"), Bytes.toBytes(timeStamp));put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("uid"), Bytes.toBytes(uid));// 輸出context.write(null, put);}}catch (Exception e) {e.printStackTrace();}} }
    • driver
    public static void main(String[] args) {// 獲取HBaseConfiguration對象,這個本身就是繼承自hadoop的Configuration 類Configuration conf = HBaseConfiguration.create();// 設置hbase的zookeeper入口(hbase的元數據服務器節點位置等信息是保存在zookeeper上的)conf.set("hbase.zookeeper.quorum", "linux100:2181,linux101:2181,linux102:2181");try {// 創建一個job對象,代表一個完整的mapreduce程序Job job = Job.getInstance(conf);// 設置mapper類job.setMapperClass(HBaseDataImportMapper.class);// 設置map階段輸出的key value類job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(MovieBean.class);// 設置輸入數據源FileInputFormat.setInputPaths(job, new Path("E:\\movie\\input"));// 這里設置reducer的類、需要導出的hbase表名,"doit:movie"中doit是namespace,movie是表名TableMapReduceUtil.initTableReducerJob("doit:movie", HBaseDataImportReducer.class, job);boolean b = job.waitForCompletion(true);if(b) {System.out.println("成功了");} else {System.out.println("失敗了");}} catch (Exception e) {e.printStackTrace();}}

    • java bean
    public class MovieBean implements Writable {// {"movie":"2294","rate":"4","timeStamp":"978824291","uid":"1"}private String movie;private double rate;private String timeStamp;private String uid;@Overridepublic String toString() {return "MovieBean{" +"movie='" + movie + '\'' +", rate=" + rate +", timeStamp='" + timeStamp + '\'' +", uid='" + uid + '\'' +'}';}public String getMovie() {return movie;}public void setMovie(String movie) {this.movie = movie;}public double getRate() {return rate;}public void setRate(double rate) {this.rate = rate;}public String getTimeStamp() {return timeStamp;}public void setTimeStamp(String timeStamp) {this.timeStamp = timeStamp;}public String getUid() {return uid;}public void setUid(String uid) {this.uid = uid;}/*** private String movie;* private double rate;* private String timeStamp;* private String uid;** 注意這里的讀寫字段的順序需要一致* */@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(movie);out.writeDouble(rate);out.writeUTF(timeStamp);out.writeUTF(uid);}@Overridepublic void readFields(DataInput in) throws IOException {movie = in.readUTF();rate = in.readDouble();timeStamp = in.readUTF();uid = in.readUTF();} }

    導入后,去hbase查看,進入hbase shell ,使用count指令查看。百萬條數據

    3.3從hbase中將數據導入另一個hbase表格中

    3.1環境準備

  • 準備好hbase中對應的表user,列族是f,字段是name和gender
  • 準備好一張空表user2,列族是f
  • 3.2 java代碼

  • driver類
  • public class HBaseTransferFromHBase {public static void main(String[] args) {// 獲取整合的初始化對象Configuration conf = HBaseConfiguration.create();// 連接zookeeper集群的的位置,給多個集群節點地址,這樣就算一個無法連接,還可以連接其他zookeeper集群的節點conf.set("hbase.zookeeper.quorum", "linux100:2181,linux101:2181,linux102:2181");// 獲取job對象Job job = null;try {job = Job.getInstance(conf);// 創建掃描對象用來掃描源hbase中的所有的數據Scan scan = new Scan();// 接收的掃描的數據的行數scan.setCaching(200);scan.setCacheBlocks(false);job.setJarByClass(HBaseTransferFromHBase.class);// 初始化 源表,這里沒有寫namespace,默認就是defaul表格中TableMapReduceUtil.initTableMapperJob("user", scan, HBaseTransferFromHBaseMapper.class, Text.class, Text.class, job);// 插入數據的表要存在TableMapReduceUtil.initTableReducerJob("user2", HBaseTransferFromHBaseReducer.class, job);boolean b = job.waitForCompletion(true);if (b){System.out.println("ok");} else {System.out.println("not ok");}} catch (Exception e) {e.printStackTrace();}} }
  • mapper類
  • class HBaseTransferFromHBaseMapper extends TableMapper<Text, Text> {// 參數一是 rowkey 參數二是結果 參數三是輸出的key 參數四是 輸出的value@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {// 獲取字符串的 rowkeyString k = new Text(new String(key.copyBytes())).toString();// 獲取指定的屬性的值String name = Bytes.toString(value.getValue("f".getBytes(), "name".getBytes()));String gender = Bytes.toString(value.getValue("f".getBytes(), "gender".getBytes()));System.out.println(k + " " + name);// 以行鍵為key 以多個屬性組裝的結果為value傳遞 到reduce中context.write(new Text(k), new Text(name + ":" + gender));} }
  • reducer類
  • class HBaseTransferFromHBaseReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {@Overrideprotected void reduce(Text key, Iterable<Text> iters,Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)throws IOException, InterruptedException {//創建put對象Put put = new Put(key.getBytes());// 獲取接收的map的value值Text next = iters.iterator().next();// 將value轉換成字符串String v = next.toString();//處理字符串獲取 各個屬性的值String[] split = v.split(":");String name = split[0];String gender = split[1];// 將各個屬性的值添加到對應的列中put.addColumn("f".getBytes(), "name".getBytes(), Bytes.toBytes(name));put.addColumn("f".getBytes(), "gender".getBytes(), Bytes.toBytes(gender));// 將put對象寫出去context.write(null, put);} }
  • 運行結束后,如下圖所示,hbase的shell客戶端中執行scan 掃描如下,確實完整導入了。如果需要更多限制,可以限制指定范圍內的row才導入,這里就不做演示。可以自行嘗試
  • 總結

    以上是生活随笔為你收集整理的HBase数据大批量导入方式总结和对比的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。