hbase常见处理方式
生活随笔
收集整理的這篇文章主要介紹了
hbase常见处理方式
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
相關依賴
<dependencies><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.1</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>1.2.1</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency></dependencies>mr
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job;import java.io.IOException;public class HBaseMR {public static class HBaseMapper extends TableMapper<Text,Put>{@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {//獲取rowkey的字節數組byte[] bytes = key.get();String rowkey = Bytes.toString(bytes);//構建一個put對象Put put = new Put(bytes);//獲取一行中所有的cell對象Cell[] cells = value.rawCells();for (Cell cell : cells) {// f1列族if("f1".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){// name列名if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){put.add(cell);}// age列名if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){put.add(cell);}}}if(!put.isEmpty()){context.write(new Text(rowkey),put);}}}public static class HbaseReducer extends TableReducer<Text,Put,ImmutableBytesWritable>{@Overrideprotected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {for (Put put : values) {context.write(null,put);}}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Scan scan = new Scan();Job job = Job.getInstance(conf);job.setJarByClass(HBaseMR.class);//使用TableMapReduceUtil 工具類來初始化我們的mapperTableMapReduceUtil.initTableMapperJob(TableName.valueOf(args[0]),scan,HBaseMapper.class,Text.class,Put.class,job);//使用TableMapReduceUtil 工具類來初始化我們的reducerTableMapReduceUtil.initTableReducerJob(args[1],HbaseReducer.class,job);//設置reduce task個數job.setNumReduceTasks(1);System.exit(job.waitForCompletion(true) ? 0 : 1);}}打成jar包提交到集群中運行
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import java.io.IOException;public class Hdfs2Hbase {public static class HdfsMapper extends Mapper<LongWritable,Text,Text,NullWritable> {protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(value,NullWritable.get());}}public static class HBASEReducer extends TableReducer<Text,NullWritable,ImmutableBytesWritable> {protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {String[] split = key.toString().split(" ");Put put = new Put(Bytes.toBytes(split[0]));put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());put.addColumn("f1".getBytes(),"age".getBytes(), split[2].getBytes());context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Hdfs2Hbase.class);job.setInputFormatClass(TextInputFormat.class);//輸入文件路徑TextInputFormat.addInputPath(job,new Path(args[0]));job.setMapperClass(HdfsMapper.class);//map端的輸出的key value 類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//指定輸出到hbase的表名TableMapReduceUtil.initTableReducerJob(args[1],HBASEReducer.class,job);//設置reduce個數job.setNumReduceTasks(1);System.exit(job.waitForCompletion(true)?0:1);} }打成jar包提交到集群中運行
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class HBaseLoad {public static class LoadMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> {@Overrideprotected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {String[] split = value.toString().split(" ");Put put = new Put(Bytes.toBytes(split[0]));put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());put.addColumn("f1".getBytes(),"age".getBytes(), split[2].getBytes());context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {final String INPUT_PATH= "hdfs://node1:9000/input";final String OUTPUT_PATH= "hdfs://node1:9000/output_HFile";Configuration conf = HBaseConfiguration.create();Connection connection = ConnectionFactory.createConnection(conf);Table table = connection.getTable(TableName.valueOf("t4"));Job job= Job.getInstance(conf);job.setJarByClass(HBaseLoad.class);job.setMapperClass(LoadMapper.class);job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(Put.class);//指定輸出的類型HFileOutputFormat2job.setOutputFormatClass(HFileOutputFormat2.class);HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("t4")));FileInputFormat.addInputPath(job,new Path(INPUT_PATH));FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));System.exit(job.waitForCompletion(true)?0:1);} }打成jar包提交到集群中運行
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;public class LoadData {public static void main(String[] args) throws Exception {Configuration configuration = HBaseConfiguration.create();configuration.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181");//獲取數據庫連接Connection connection = ConnectionFactory.createConnection(configuration);//獲取表的管理器對象Admin admin = connection.getAdmin();//獲取table對象TableName tableName = TableName.valueOf("t4");Table table = connection.getTable(tableName);//構建LoadIncrementalHFiles加載HFile文件LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);load.doBulkLoad(new Path("hdfs://node1:9000/output_HFile"), admin,table,connection.getRegionLocator(tableName));} }hbase集成hive
#### 整合配置1、修改hive-site.xml文件,添加配置屬性<property> <name>hbase.zookeeper.quorum</name><value>node1:2181,node2:2181,node3:2181</value></property>2、修改 hive-env.sh 文件,添加hbase的依賴包到hive的classpath中 export HIVE_CLASSPATH=$HIVE_CLASSPATH:/hbase/lib/*3、使用編譯好的 hive-hbase-handler-1.2.1.jar替換hive之前的lib目錄下的該jar包將hbase表映射到hive表中 創建基于hbase的hive表 create external table hiveFromHbase( rowkey string, f1 map<STRING,STRING>, f2 map<STRING,STRING>, f3 map<STRING,STRING> ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,f1:,f2:,f3:") TBLPROPERTIES ("hbase.table.name" = "hbase_test");--這里使用外部表映射到HBase中的表,這樣,在Hive中刪除表,并不會刪除HBase中的表,否則,就會刪除。另外,除了rowkey,其他三個字段使用Map結構來保存HBase中的每一個列族。--hbase.columns.mapping Hive表和HBase表的字段映射關系,分別為:Hive表中第一個字段映射:key(rowkey),第二個字段映射列族f1,第三個字段映射列族f2,第四個字段映射列族f3--hbase.table.name HBase中表的名字 查看hive表的數據 select * from hivefromhbase;將hive表映射到hbase表中 1、創建一張映射hbase的表 create table hive_test( id string, name string, age int, address string )STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,f1:name,f2:age,f3:address") TBLPROPERTIES ("hbase.table.name" = "hbaseFromhive"); * 2、查看hbase映射表是否產生* 這里由于hive表是剛剛構建,目前是沒有數據,同樣這張hbase表也沒有數據3、向hive表加載數據 insert into table hive_test select * from hive_source;hbase的數據備份##### 基于hbase提供的類對hbase中某張表進行備份* 使用hbase提供的類把hbase中某張表的數據導出hdfs,之后再導出到測試hbase表中。* (1) ==從hbase表導出== * HBase數據導出到HDFS hbase org.apache.hadoop.hbase.mapreduce.Export test /hbase_data/test_bakHBase數據導出到本地文件 hbase org.apache.hadoop.hbase.mapreduce.Export test file:///home/hadoop/test_bak ` 將hdfs上的數據導入到備份目標表中將hdfs上的數據導入到備份目標表中 hbase org.apache.hadoop.hbase.mapreduce.Driver import test_bak /hbase_data/test_bak/*將本地文件上的數據導入到備份目標表中 hbase org.apache.hadoop.hbase.mapreduce.Driver import test_bak file:///home/hadoop/test_bak/*基于snapshot的方式實現對hbase中某張表進行備份* 通過snapshot快照的方式實現HBase數據的遷移和拷貝。這種方式比較常用,效率高,也是最為推薦的數據遷移方式。* HBase的snapshot其實就是一組==metadata==信息的集合(文件列表),通過這些metadata信息的集合,就能將表的數據回滾到snapshot那個時刻的數據。snapshot 'tableName', 'snapshotName'list_snapshots查找以test開頭的snapshotlist_snapshots 'test.*'restore_snapshot 'snapshotName'ps:這里需要對表進行disable操作,先把表置為不可用狀態,然后在進行進行restore_snapshot的操作例如:disable 'tableName'restore_snapshot 'snapshotName'enable 'tableName'delete_snapshot 'snapshotName'hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot \-snapshot snapshotName \-copy-from hdfs://src-hbase-root-dir/hbase \-copy-to hdfs://dst-hbase-root-dir/hbase \-mappers 1 \-bandwidth 1024例如:hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot \-snapshot test \-copy-from hdfs://node1:9000/hbase \-copy-to hdfs://node1:9000/hbase1 \-mappers 1 \-bandwidth 1024這種方式用于將快照表遷移到另外一個集群的時候使用,使用MR進行數據的拷貝,速度很快,使用的時候記得設置好bandwidth參數,以免由于網絡打滿導致的線上業務故障。* 將snapshot使用bulkload的方式導入~~~hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles \hdfs://dst-hbase-root-dir/hbase/archive/datapath/tablename/filename \tablename例如:創建一個新表create 'newTest','f1','f2'hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://node1:9000/hbase1/archive/data/default/test/6325fabb429bf45c5dcbbe672225f1fb newTest~~~為了HBase的數據查詢更高效、適應更多的場景,諸如使用非rowkey字段檢索也能做到秒級響應,或者支持各個字段進行模糊查詢和多字段組合查詢等, 因此需要在HBase上面構建二級索引, 以滿足現實中更復雜多樣的業務需求。hbase的二級索引其本質就是建立hbase表中列與行鍵之間的映射關系。 構建hbase二級索引方案* MapReduce * Hbase Coprocessor(協處理器) * Solr+hbase * ES+hbase * Phoenix+hbase總結
以上是生活随笔為你收集整理的hbase常见处理方式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: k 近邻降维
- 下一篇: 【保存】maven的pom.xml标签的