HBase數據導入
1. 背景
在實際生產中,海量數據一般都不是直接存儲在HBase中,這時候就需要一個數據導入到HBase的步驟上一篇博客講述了可以通過java api的方式或者shell 客戶端方式導入或者創建數據,但這對于實際生產中海量數據導入來說,速度和效率都太慢了,所以我們需要使用其他方式來解決海量輸入導入到HBase的問題利用HBase底層文件是HFile形式存儲再HDFS中,所以如果能夠直接生成HFile的話,這時候再讓HBase從HFile中讀取數據,就會快很多。
2. 批量數據導入方式
shell 腳本命令,使用工具將HFile直接導入到HBase中編寫mapreduce程序,生成Hfile。然后再使用上shell方式,將HFile文件導入到HBase中編寫mapreduce程序,直接將文件中數據寫入到Hbase中將數據從HBase中一個地方(namespace、table)導入到HBase另外一個地方(namespace、table)
3.數據導入步驟
3.1 shell 腳本命令,使用工具將HFile直接導入到HBase中
3.1.1 將csv文件轉為HFile
數據準備(數據形式需要是csv格式的數據)
在windows或者linux節點服務器上準備好,然后通過hdfs dfs -put指令上傳到hdfs文件系統中。這里選擇在linux系統上創建,可以省去在windows上創建后還要再從windows上傳到linux節點服務器的步驟
文件名這里是:bulkload.csv
1,lenovo,black,3000,intel
2,lenovo,black,43000,intel
3,xiaomi,black,3500,amd
4,lenovo,black,3000,amd
5,huawei,black,3000,amd
6,xiaomi,black,3000,arm
7,xiaomi,black,3000,arm
8,lenovo,black,6000,arm
9,huawei,black,5000,intel
10,huawei,black,3000,intel
11,huawei,black,7000,intel
12,huawei,black,3000,intel
13,hp,black,3000,amd
14,dell,black,3000,amd
15,huawei,black,3000,amd
16,apple,silver,544000,arm
17,mechanic,black,3000,arm
18,mechnic,black,3000,amd
19,hasee,black,12000,amd
20,hasee,black,23000,amd
21,hp,black,3000,intel
22,acer,black,3000,intel
23,acer,black,33000,intel
24,dell,black,30040,intel
25,dell,black,3000,intel
26,huawei,white,3000,amd
27,founder,blue,30060,intel
28,huawei,pink,3000,intel
29,huawei,black,30300,intel
30,huawei,black,3000,arm
31,huawei,black,33000,arm
32,lenovo,black,30200,arm
33,huawei,black,33000,intel
34,lenovo,black,3000,intel
35,huawei,black,30500,intel
36,lenovo,black,3000,intel
在hdfs上創建輸入文件的文件夾
hdfs dfs -mkdir -p /csv/input
將linux節點服務器上的csv文件上傳到hdfs的輸入文件文件夾中
hdfs dfs -put bulkload.csv /csv/input-- 上傳之后,輸入以下shell命令查看文件
hdfs dfs -ls /csv/input
文件上傳后,查看上傳結果
4. 在hdfs集群節點服務器上執行以下shell命令
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \
-Dimporttsv.separator
=, \
-Dimporttsv.columns
='HBASE_ROW_KEY,cf1:brand,cf1:color,cf1:price,cf2:cpu_brand' \
-Dimporttsv.bulk.output
=/csv/output \
doit:tb_computer_info \
/csv/input
-Dimporttsv.skip.bad.lines=false - 若遇到無效行則失敗
-Dimporttsv.separator=, - 使用特定分隔符,默認是tab也就是\t
-Dimporttsv.timestamp=currentTimeAsLong - 使用導入時的時間戳
-Dimporttsv.mapper.class=my.Mapper - 使用用戶自定義Mapper類替換TsvImporterMapper
-Dmapreduce.job.name=jobName - 對導入使用特定mapreduce作業名
-Dcreate.table=no - 避免創建表,注:如設為為no,目標表必須存在于HBase中
-Dno.strict=true - 忽略HBase表列族檢查。默認為false
-Dimporttsv.bulk.output=/user/yarn/output 作業的輸出目錄
- 輸入指令后,開始執行,按下enter鍵執行
- 執行過程日志,可以看出其實就是執行了一個mapreduce的程序
- 查看輸出的文件
注意這里按照列族數量,生成了2個文件夾,和hbase存儲在hdfs中的文件規則是一致的。
注意點擊進去cf1文件夾查看,可以看到有一個文件,這就是生成的hfile文件
3.1.2 將HFile導入到HBase中
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /csv/output doit:tb_computer_info
- 輸入指令后,可以看到如下日志,可以看出這時候就是HBase內部的程序,而不是mapreduce
- 查看hbase中對應表格中數據
3.2 使用mapreduce方式導入hbase
3.2.1 環境準備
pom文件
<properties><project.build.sourceEncoding>UTF-8
</project.build.sourceEncoding><maven.compiler.source>1.8
</maven.compiler.source><maven.compiler.target>1.8
</maven.compiler.target></properties>
<dependencies>
<dependency><groupId>org.apache.zookeeper
</groupId><artifactId>zookeeper
</artifactId><version>3.4.6
</version>
</dependency><dependency><groupId>org.apache.hadoop
</groupId><artifactId>hadoop-auth
</artifactId><version>3.2.1
</version>
</dependency><dependency><groupId>junit
</groupId><artifactId>junit
</artifactId><version>4.12
</version><scope>compile
</scope>
</dependency><dependency><groupId>org.apache.hbase
</groupId><artifactId>hbase-client
</artifactId><version>2.2.5
</version>
</dependency><dependency><groupId>org.apache.hadoop
</groupId><artifactId>hadoop-client
</artifactId><version>3.2.1
</version>
</dependency><dependency><groupId>org.apache.hadoop
</groupId><artifactId>hadoop-common
</artifactId><version>3.2.1
</version>
</dependency><dependency><groupId>org.apache.hbase
</groupId><artifactId>hbase-server
</artifactId><version>2.2.5
</version>
</dependency>
<dependency><groupId>org.apache.hbase
</groupId><artifactId>hbase-mapreduce
</artifactId><version>2.2.5
</version>
</dependency><dependency><groupId>com.google.code.gson
</groupId><artifactId>gson
</artifactId><version>2.8.5
</version>
</dependency>
<dependency><groupId>org.apache.phoenix
</groupId><artifactId>phoenix-core
</artifactId><version>5.0.0-HBase-2.0
</version>
</dependency></dependencies><build>
<plugins><plugin><groupId>org.apache.maven.plugins
</groupId><artifactId>maven-compiler-plugin
</artifactId><version>3.5.1
</version><configuration><source>1.8
</source><target>1.8
</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins
</groupId><artifactId>maven-assembly-plugin
</artifactId><version>2.6
</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies
</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly
</id><phase>package
</phase><goals><goal>single
</goal></goals></execution></executions></plugin></plugins>
</build>
集群環境準備
- hdfs集群 hdfs
- zookeeper集群 zk
- hbase集群 hbase
注意,需要都啟動,啟動順序,先啟動hdfs,然后是zookeeper,然后是hbase
windows10,安裝好 idea 2020版本和jdk 1.8 JDK8maven環境 maven數據準備, 這里手動制造一些json數據,一行一行存放。
{"movie":"2294","rate":"4","timeStamp":"978824291","uid":"1"}
{"movie":"3186","rate":"4","timeStamp":"978300019","uid":"1"}
{"movie":"1566","rate":"a","timeStamp":"978824330","uid":"1"}
{"movie":"588","rate":"4","timeStamp":"978824268","uid":"1"}
{"movie":"1907","rate":"4","timeStamp":"978824330","uid":"1"}
{"movie":"783","rate":"4","timeStamp":"978824291","uid":"1"}
{"movie":"1836","rate":"5","timeStamp":"978300172","uid":"1"}
{"movie":"1022","rate":"5","timeStamp":"978300055","uid":"1"}
{"movie":"2762","rate":"4","timeStamp":"978302091","uid":"1"}
{"movie":"150","rate":"5","timeStamp":"978301777","uid":"1"}
3.2.2 數據導入思路
從外部導入數據到HBase中,由于HBase本身不支持多維度的數據查詢,所以需要進行HBase表數據字段和結構設計外部數據可能有錯誤或者缺失信息,需要考慮如何處理(數據預處理或者在mapreduce中處理都可以)rowkey設計
- 這里假定業務場景是根據movie id進行數據查詢,所以rowkey的設計需要把movie id放進去,
- 同時movie id長度不一,這時候采取補位處理,讓所有movie id的長度一致。考慮到不損失其原本含義,采用前面補0,否則后面補0會損失id的原本含義。
- 看數據,如果采用movie id作為rowkey,則數據會發生覆蓋,這時候如果把rate、時間戳、uid作為后綴加進去。uid和rate會有重復,但時間戳沒有,所以選擇時間戳作為后綴來確保rowkey的唯一性。這樣一條數據對應hbase中一條數據,不會發生數據覆蓋問題。
數據分區考慮,注意這里的數據(手動造了100萬條左右),數據量比較大,為了合理存放這些數據,也就是相對均勻劃分到不同region server上,同時確保相關數據盡可能在一起。可以考慮在設計好的rowkey基礎上,看是否做分區處理。如果需要,可以執行SPLITS設置。本文這里暫不演示
3.2.3 代碼實現
因為數據是json格式,所以需要有一個java bean存放數據。這個java bean會參與mapreduce過程,所以需要遵守writable協議map過程就是把一行一行文本轉換為一個一個的java bean,注意這里的rowkey需要reduce就是把map階段產生的key、value進行輸出,寫入到hdfs的文件中或者直接寫入hbase中
class HBaseDataImportMapper extends Mapper<LongWritable, Text, Text, MovieBean> {Gson gson
= new Gson();Text rowkeyText
= new Text();@Overrideprotected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{try{String line
= value
.toString();MovieBean movieBean
= gson
.fromJson(line
, MovieBean
.class);String movie
= movieBean
.getMovie();String timeStamp
= movieBean
.getTimeStamp();String rowkey
= StringUtils
.leftPad(movie
, 5, '0') + "_" + timeStamp
;rowkeyText
.set(rowkey
);context
.write(rowkeyText
, movieBean
);}catch (Exception e
) {e
.printStackTrace();}}
}
class HBaseDataImportReducer extends TableReducer<Text, MovieBean, ImmutableBytesWritable> {@Overrideprotected void reduce(Text key
, Iterable
<MovieBean> values
, Context context
) throws IOException
, InterruptedException
{try {String rowkey
= key
.toString();for (MovieBean value
: values
) {String movie
= value
.getMovie();double rate
= value
.getRate();String timeStamp
= value
.getTimeStamp();String uid
= value
.getUid();Put put
= new Put(Bytes
.toBytes(rowkey
));put
.addColumn(Bytes
.toBytes("cf1"), Bytes
.toBytes("movie"), Bytes
.toBytes(movie
));put
.addColumn(Bytes
.toBytes("cf1"), Bytes
.toBytes("rate"), Bytes
.toBytes(rate
));put
.addColumn(Bytes
.toBytes("cf1"), Bytes
.toBytes("timeStamp"), Bytes
.toBytes(timeStamp
));put
.addColumn(Bytes
.toBytes("cf1"), Bytes
.toBytes("uid"), Bytes
.toBytes(uid
));context
.write(null
, put
);}}catch (Exception e
) {e
.printStackTrace();}}
}
public static void main(String
[] args
) {Configuration conf
= HBaseConfiguration
.create();conf
.set("hbase.zookeeper.quorum", "linux100:2181,linux101:2181,linux102:2181");try {Job job
= Job
.getInstance(conf
);job
.setMapperClass(HBaseDataImportMapper
.class);job
.setMapOutputKeyClass(Text
.class);job
.setMapOutputValueClass(MovieBean
.class);FileInputFormat
.setInputPaths(job
, new Path("E:\\movie\\input"));TableMapReduceUtil
.initTableReducerJob("doit:movie", HBaseDataImportReducer
.class, job
);boolean b
= job
.waitForCompletion(true);if(b
) {System
.out
.println("成功了");} else {System
.out
.println("失敗了");}} catch (Exception e
) {e
.printStackTrace();}}
public class MovieBean implements Writable {private String movie
;private double rate
;private String timeStamp
;private String uid
;@Overridepublic String
toString() {return "MovieBean{" +"movie='" + movie
+ '\'' +", rate=" + rate
+", timeStamp='" + timeStamp
+ '\'' +", uid='" + uid
+ '\'' +'}';}public String
getMovie() {return movie
;}public void setMovie(String movie
) {this.movie
= movie
;}public double getRate() {return rate
;}public void setRate(double rate
) {this.rate
= rate
;}public String
getTimeStamp() {return timeStamp
;}public void setTimeStamp(String timeStamp
) {this.timeStamp
= timeStamp
;}public String
getUid() {return uid
;}public void setUid(String uid
) {this.uid
= uid
;}@Overridepublic void write(DataOutput out
) throws IOException
{out
.writeUTF(movie
);out
.writeDouble(rate
);out
.writeUTF(timeStamp
);out
.writeUTF(uid
);}@Overridepublic void readFields(DataInput in
) throws IOException
{movie
= in
.readUTF();rate
= in
.readDouble();timeStamp
= in
.readUTF();uid
= in
.readUTF();}
}
導入后,去hbase查看,進入hbase shell ,使用count指令查看。百萬條數據
3.3從hbase中將數據導入另一個hbase表格中
3.1環境準備
準備好hbase中對應的表user,列族是f,字段是name和gender準備好一張空表user2,列族是f
3.2 java代碼
driver類
public class HBaseTransferFromHBase {public static void main(String
[] args
) {Configuration conf
= HBaseConfiguration
.create();conf
.set("hbase.zookeeper.quorum", "linux100:2181,linux101:2181,linux102:2181");Job job
= null
;try {job
= Job
.getInstance(conf
);Scan scan
= new Scan();scan
.setCaching(200);scan
.setCacheBlocks(false);job
.setJarByClass(HBaseTransferFromHBase
.class);TableMapReduceUtil
.initTableMapperJob("user", scan
, HBaseTransferFromHBaseMapper
.class, Text
.class, Text
.class, job
);TableMapReduceUtil
.initTableReducerJob("user2", HBaseTransferFromHBaseReducer
.class, job
);boolean b
= job
.waitForCompletion(true);if (b
){System
.out
.println("ok");} else {System
.out
.println("not ok");}} catch (Exception e
) {e
.printStackTrace();}}
}
mapper類
class HBaseTransferFromHBaseMapper extends TableMapper<Text, Text> {@Overrideprotected void map(ImmutableBytesWritable key
, Result value
, Context context
) throws IOException
, InterruptedException
{String k
= new Text(new String(key
.copyBytes())).toString();String name
= Bytes
.toString(value
.getValue("f".getBytes(), "name".getBytes()));String gender
= Bytes
.toString(value
.getValue("f".getBytes(), "gender".getBytes()));System
.out
.println(k
+ " " + name
);context
.write(new Text(k
), new Text(name
+ ":" + gender
));}
}
reducer類
class HBaseTransferFromHBaseReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {@Overrideprotected void reduce(Text key
, Iterable
<Text> iters
,Reducer
<Text, Text, ImmutableBytesWritable, Mutation>.Context context
)throws IOException
, InterruptedException
{Put put
= new Put(key
.getBytes());Text next
= iters
.iterator().next();String v
= next
.toString();String
[] split
= v
.split(":");String name
= split
[0];String gender
= split
[1];put
.addColumn("f".getBytes(), "name".getBytes(), Bytes
.toBytes(name
));put
.addColumn("f".getBytes(), "gender".getBytes(), Bytes
.toBytes(gender
));context
.write(null
, put
);}
}
運行結束后,如下圖所示,hbase的shell客戶端中執行scan 掃描如下,確實完整導入了。如果需要更多限制,可以限制指定范圍內的row才導入,這里就不做演示。可以自行嘗試
總結
以上是生活随笔為你收集整理的HBase数据大批量导入方式总结和对比的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。