HBase 数据导入功能实现方式解释
https://www.ibm.com/developerworks/cn/opensource/os-cn-data-import/index.html
預(yù)備知識:啟動 HBase
清單 1. 修改 hosts 文件
| 1 2 3 | [root@node1:2 hbase-0.96.1.1-cdh5.0.1]# cat /etc/hosts 10.17.139.186 node1 10.17.139.185 scheduler2 |
清單 2. 啟動 HBase 服務(wù)
| 1 2 3 4 5 6 7 8 9 10 11 12 | [root@node1:2 bin]# ./start-hbase.sh starting master, logging to /home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/../logs/hbase-root-master-node1.out [root@node1:2 bin]# jps 2981 SchedulerServer 46776 Jps 29242 org.eclipse.equinox.launcher_1.1.0.v20100507.jar 2686 IvmsSchedulerDog 46430 HMaster [root@node1:2 bin]# ps -ef | grep hbase root 46415 1 0 09:34 pts/2 00:00:00 bash /home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/hbase-daemon.sh --config /home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/../conf internal_start master root 46430 46415 91 09:34 pts/2 00:00:19 /usr/share/jdk1.8.0_45/bin/java -Dproc_master -XX:OnOutOfMemoryError=kill -9 %p -Xmx1000m -XX:+UseConcMarkSweepGC -Dhbase.log.dir=/home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/../logs -Dhbase.log.file=hbase-root-master-node1.log -Dhbase.home.dir=/home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/.. -Dhbase.id.str=root -Dhbase.root.logger=INFO,RFA -Dhbase.security.logger=INFO,RFAS org.apache.hadoop.hbase.master.HMaster start root 47464 1078 0 09:34 pts/2 00:00:00 grep hbase |
清單 3. 插入若干數(shù)據(jù)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | hbase(main):002:0> put 'test', 'row1', 'cf:a', 'value1' 0 row(s) in 0.1180 seconds => ["test"] hbase(main):004:0> scan 'test' ROW COLUMN+CELL row1 column=cf:a, timestamp=1439861879625, value=value1 1row(s) in 0.0380 seconds hbase(main):005:0> put 'test', 'row2', 'cf:b', 'value2' 0 row(s) in 0.0170 seconds hbase(main):006:0> put 'test', 'row3', 'cf:c', 'value3' 0 row(s) in 0.0130 seconds hbase(main):007:0> scan 'test' ROW COLUMN+CELL ?row1 column=cf:a, timestamp=1439861879625, value=value1 ?row2 column=cf:b, timestamp=1439861962080, value=value2 row3 column=cf:c, timestamp=1439861968096, value=value3 3 row(s) in 0.0270 seconds hbase(main):008:0> put 'test', 'row2', 'cf:b', 'value2' 0 row(s) in 0.0080 seconds hbase(main):009:0> scan 'test' ROW COLUMN+CELL row1 column=cf:a, timestamp=1439861879625, value=value1 row2 column=cf:b, timestamp=1439861984176, value=value2 ?row3 column=cf:c, timestamp=1439861968096, value=value3 3 row(s) in 0.0230 seconds hbase(main):013:0> put 'test','row1','cf:a','value2' 0 row(s) in 0.0150 seconds hbase(main):014:0> scan 'test' ROW COLUMN+CELL row1 column=cf:1, timestamp=1439862083677, value=value1 row1 column=cf:a, timestamp=1439862100401, value=value2 row2 column=cf:b, timestamp=1439861984176, value=value2 row3 column=cf:c, timestamp=1439861968096, value=value3 |
向 HBase 導(dǎo)入數(shù)據(jù)
注意:本文代碼基于 HBase0.94 版本。
數(shù)據(jù)導(dǎo)入到 HBase,我們必須考慮分布式環(huán)境下的數(shù)據(jù)合并問題,而數(shù)據(jù)合并問題一直是 HBase 的難題,因為數(shù)據(jù)合并需要頻繁執(zhí)行寫操作任務(wù),解決方案是我們可以通過生成 HBase 的內(nèi)部數(shù)據(jù)文件,這樣可以做到直接把數(shù)據(jù)文件加載到 HBase 數(shù)據(jù)庫對應(yīng)的數(shù)據(jù)表。這樣的做法寫入 HBase 的速度確實很快,但是如果合并過程中 HBase 的配置不是很正確,可能會造成寫操作阻塞。目前我們常用的數(shù)據(jù)導(dǎo)入方法有 HBase Client 調(diào)用方式、MapReduce 任務(wù)方式、Bulk Load 工具方式、Sqoop 工具方式這四種。下面的文章內(nèi)容會逐一展開講解。
下面的幾種方式都可以通過 HFile 的幫助做到快速數(shù)據(jù)導(dǎo)入,我們首先在這里先給出生成 HFile 的 Java 代碼,后面各個方法內(nèi)部再按照各自方式插入 HFile 文件到 HBase 數(shù)據(jù)庫。代碼如清單 4 所示。
清單 4. 生成 HFile 代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | import org.apache.hadoop.conf.Configuration; ??…… public class generateHFile { public static class generateHFileMapper extends Mapper<LongWritable, ????????????????Text, ImmutableBytesWritable, KeyValue> { ?@Override ?protected void map(LongWritable key, Text value, Context context) ?throws IOException, InterruptedException { ?String line = value.toString(); ?String[] items = line.split(",", -1); ?ImmutableBytesWritable rowkey = new ImmutableBytesWritable(items[0].getBytes()); ?KeyValue kvProtocol = new KeyValue(items[0].getBytes(), "colfam1".getBytes(), ?????????????????????????"colfam1".getBytes(), items[0].getBytes()); ?if (null != kvProtocol) { ?context.write(rowkey, kvProtocol); ?} ?} ?} public static void main(String[] args) throws IOException, ??????????????????????InterruptedException, ClassNotFoundException { Configuration conf = HBaseConfiguration.create(); System.out.println("conf="+conf); HTable table = new HTable(conf, "testtable1"); System.out.println("table="+table); Job job = new Job(conf, "generateHFile"); job.setJarByClass(generateHFile.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setMapperClass(generateHFileMapper.class); job.setReducerClass(KeyValueSortReducer.class); job.setOutputFormatClass(HFileOutputFormat.class);//組織成 HFile 文件 //自動對 job 進行配置,SimpleTotalOrderPartitioner 是需要先對 key 進行整體排序, //然后劃分到每個 reduce 中,保證每一個 reducer 中的的 key 最小最大值區(qū)間范圍,是不會有交集的。 HFileOutputFormat.configureIncrementalLoad(job, table); ?FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
運行代碼后生成的 HFile 文件放著后面要用。
Client API 方法
使用 HBase 的 API 中的 Put 方法是最直接的數(shù)據(jù)導(dǎo)入方式,如清單 3 我們就是采用 HBase 自帶的 Shell 工具,調(diào)用 put 命令插入了幾條數(shù)據(jù)作為演示。該方式的缺點是當(dāng)需要將海量數(shù)據(jù)在規(guī)定時間內(nèi)導(dǎo)入 HBase 中時,需要消耗較大的 CPU 和網(wǎng)絡(luò)資源,所以這個方式適用于數(shù)據(jù)量較小的應(yīng)用環(huán)境。
使用 Put 方法將數(shù)據(jù)插入 HBase 中的方式,由于所有的操作均是在一個單獨的客戶端執(zhí)行,所以不會使用到 MapReduce 的 job 概念,即沒有任務(wù)的概念,所有的操作都是逐條插入到數(shù)據(jù)庫中的。大致的流程可以分解為 HBase Client--->HTable---->Hmastermanager/ZK(獲取-root-,--meta--)------>HregionServer----->Hregion------>Hlog/Hmemstore----->HFile。即 HBase Client 調(diào)用 HTable 類訪問到 HMaster 的原數(shù)據(jù)保存地點,然后通過找到相應(yīng)的 Region Server,并分配具體的 Region,最后操作到 HFile 這一層級。當(dāng)連接上 HRegionServer 后,首先獲得鎖,然后調(diào)用 HRegion 類對應(yīng)的 put 命令開始執(zhí)行數(shù)據(jù)導(dǎo)入操作,數(shù)據(jù)插入后還要寫時間戳、寫 Hlog,WAL(Write Ahead Log)、Hmemstore。具體實現(xiàn)代碼如清單 5 所示,在代碼中我們嘗試插入了 10 萬條數(shù)據(jù),打印出插入過程消耗的時間。
清單 5. 采用 HBase Client 方式代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class PutDemo { ?public static void main(String[] args) throws IOException { ?//創(chuàng)建 HBase 上下文環(huán)境 ?Configuration conf = HBaseConfiguration.create(); ?System.out.println("conf="+conf); ?int count=0; ?? ?HBaseHelper helper = HBaseHelper.getHelper(conf); ?System.out.println("helper="+helper); ?helper.dropTable("testtable1"); ?helper.createTable("testtable1", "colfam1"); ?? ?HTable table = new HTable(conf, "testtable1"); ?long start = System.currentTimeMillis(); for(int i=1;i<100000;i++){ //設(shè)置 rowkey 的值 ?Put put = new Put(Bytes.toBytes("row"+i)); // 設(shè)置 family:qualifier:value ?put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), ?Bytes.toBytes("val1")); ?put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), ?Bytes.toBytes("val2")); ?//調(diào)用 put 方法,插入數(shù)據(jù)導(dǎo) HBase 數(shù)據(jù)表 testtable1 里 ?table.put(put); ?count++; ?if(count%10000==0){ ?System.out.println("Completed 10000 rows insetion"); ?} ?} ?? ?System.out.println(System.currentTimeMillis() - start); ?} } |
清單 6. 采用 HBase Client 方式代碼運行輸出
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | conf=Configuration: core-default.xml, core-site.xml, hbase-default.xml, hbase-site.xml 2015-08-20 18:58:18,184 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-08-20 18:58:18,272 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:zookeeper.version=3.4.5-cdh4.6.0--1, built on 02/26/2014 09:15 GMT 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:host.name=node3 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.version=1.7.0_79 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.vendor=Oracle Corporation 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.home=/usr/lib/jdk1.7.0_79/jre 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.class.path=./zz.jar 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.io.tmpdir=/tmp 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.compiler=<NA> 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.name=Linux 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.arch=amd64 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.version=2.6.32-220.el6.x86_64 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.name=root 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.home=/root 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.dir=/home/zhoumingyao 2015-08-20 18:58:18,277 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=hconnection 2015-08-20 18:58:18,294 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:18,300 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:18,308 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/127.0.0.1:2181, initiating session 2015-08-20 18:58:18,317 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x24f2624839f0023, negotiated timeout = 180000 2015-08-20 18:58:18,394 WARN [main] conf.Configuration (Configuration.java:warnOnceIfDeprecated(981)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available helper=HBaseHelper@5d48e5d6 2015-08-20 18:58:18,570 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=catalogtracker-on-org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@6c521576 2015-08-20 18:58:18,571 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:18,572 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:18,572 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session 2015-08-20 18:58:18,575 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x24f2624839f0024, negotiated timeout = 180000 2015-08-20 18:58:18,647 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:close(684)) - Session: 0x24f2624839f0024 closed 2015-08-20 18:58:18,647 INFO [main-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(512)) - EventThread shut down 2015-08-20 18:58:18,672 INFO [main] client.HBaseAdmin (HBaseAdmin.java:disableTableAsync(858)) - Started disable of testtable1 2015-08-20 18:58:18,676 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=catalogtracker-on-org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@6c521576 2015-08-20 18:58:18,678 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:18,679 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:18,680 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/127.0.0.1:2181, initiating session 2015-08-20 18:58:18,683 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x24f2624839f0025, negotiated timeout = 180000 2015-08-20 18:58:18,705 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:close(684)) - Session: 0x24f2624839f0025 closed 2015-08-20 18:58:18,705 INFO [main-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(512)) - EventThread shut down 2015-08-20 18:58:19,713 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=catalogtracker-on-org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@6c521576 2015-08-20 18:58:19,714 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:19,715 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:19,716 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session 2015-08-20 18:58:19,720 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x24f2624839f0026, negotiated timeout = 180000 2015-08-20 18:58:19,733 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:close(684)) - Session: 0x24f2624839f0026 closed 2015-08-20 18:58:19,733 INFO [main-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(512)) - EventThread shut down 2015-08-20 18:58:19,735 INFO [main] client.HBaseAdmin (HBaseAdmin.java:disableTable(905)) - Disabled testtable1 2015-08-20 18:58:20,763 INFO [main] client.HBaseAdmin (HBaseAdmin.java:deleteTable(656)) - Deleted testtable1 table=testtable1 2015-08-20 18:58:21,809 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=catalogtracker-on-org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@6c521576 2015-08-20 18:58:21,810 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:21,811 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:21,812 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/127.0.0.1:2181, initiating session 2015-08-20 18:58:21,816 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x24f2624839f0027, negotiated timeout = 180000 2015-08-20 18:58:21,828 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:close(684)) - Session: 0x24f2624839f0027 closed 2015-08-20 18:58:21,828 INFO [main-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(512)) - EventThread shut down Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion 127073ms |
整個插入 10 萬條數(shù)據(jù)的耗時達到了 127 秒,即 2 分鐘。清單 7 所示是清單 5 代碼中用到的類源代碼。
清單 7.HBaseHelper 類代碼部分相關(guān)代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 | import org.apache.hadoop.conf.Configuration; …… /** ?* Used by the book examples to generate tables and fill them with test data. ?*/ public class HBaseHelper { //在 Java 代碼中,為了連接到 HBase,我們首先創(chuàng)建一個配置(Configuration)對象,使用該對象創(chuàng)建一個 HTable 實例。 //這個 HTable 對象用于處理所有的客戶端 API 調(diào)用。 ?private Configuration conf = null; ?private HBaseAdmin admin = null; ?protected HBaseHelper(Configuration conf) throws IOException { ?this.conf = conf; ?this.admin = new HBaseAdmin(conf); ?} ?public static HBaseHelper getHelper(Configuration conf) throws IOException { ?return new HBaseHelper(conf); ?} ?public void put(String table, String row, String fam, String qual, long ts, ?String val) throws IOException { ?HTable tbl = new HTable(conf, table); ?Put put = new Put(Bytes.toBytes(row)); ?put.add(Bytes.toBytes(fam), Bytes.toBytes(qual), ts, ?Bytes.toBytes(val)); ?tbl.put(put); ?tbl.close(); ?} ?public void put(String table, String[] rows, String[] fams, String[] quals, ?long[] ts, String[] vals) throws IOException { ?HTable tbl = new HTable(conf, table); ?for (String row : rows) { ?Put put = new Put(Bytes.toBytes(row)); ?for (String fam : fams) { ?int v = 0; ?for (String qual : quals) { ?String val = vals[v < vals.length ? v : vals.length]; ?long t = ts[v < ts.length ? v : ts.length - 1]; ?put.add(Bytes.toBytes(fam), Bytes.toBytes(qual), t, ?Bytes.toBytes(val)); ?v++; ?} ?} ?tbl.put(put); ?} ?tbl.close(); ?} ?public void dump(String table, String[] rows, String[] fams, String[] quals) ?throws IOException { ?HTable tbl = new HTable(conf, table); ?List<Get> gets = new ArrayList<Get>(); ?for (String row : rows) { ?Get get = new Get(Bytes.toBytes(row)); ?get.setMaxVersions(); ?if (fams != null) { ?for (String fam : fams) { ?for (String qual : quals) { ?get.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual)); ?} ?} ?} ?gets.add(get); ?} ?Result[] results = tbl.get(gets); ?for (Result result : results) { ?for (KeyValue kv : result.raw()) { ?System.out.println("KV: " + kv + ?", Value: " + Bytes.toString(kv.getValue())); ?} ?} ?} } public void dropTable(String table) throws IOException { ?if (existsTable(table)) { ?disableTable(table); ?admin.deleteTable(table); ?} ?} public void put(String table, String row, String fam, String qual, long ts, ?String val) throws IOException { ?HTable tbl = new HTable(conf, table); ?Put put = new Put(Bytes.toBytes(row)); ?put.add(Bytes.toBytes(fam), Bytes.toBytes(qual), ts, ?Bytes.toBytes(val)); ?tbl.put(put); ?tbl.close(); ?} |
MapReduce 方法
如果需要通過編程來生成數(shù)據(jù),那么用 importtsv 工具不是很方便,這時候可以使用 MapReduce 向 HBase 導(dǎo)入數(shù)據(jù),但海量的數(shù)據(jù)集會讓 MapReduce Job 變得很繁重,若處理不當(dāng),則可能使得 MapReduce 的 job 運行時的吞吐量很小。由于 MapReduce 在寫 HBase 是采用的是 TableOutputFormat 方式,這樣在寫入數(shù)據(jù)庫的時候容易對寫入塊進行頻繁的刷新、分割、合并操作,這些操作都是較為耗費磁盤 I/O 的操作,最終導(dǎo)致 HBase 節(jié)點的不穩(wěn)定性。
前面介紹過生成 HFile 的代碼,生成 HFile 后,我們可以采用 MapReduce 方式把數(shù)據(jù)導(dǎo)入到 HBase 數(shù)據(jù)表里,具體代碼如清單 8 所示。
清單 8.MapReduce 方式導(dǎo)入 HFile 到 HBase 數(shù)據(jù)表
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | import java.io.IOException; …… public class HBaseImportByMapReduce extends Configured implements Tool { static final Log LOG = LogFactory.getLog(HBaseImportByMapReduce.class); public static final String JOBNAME = "MapReduceImport"; public static class Map extends Mapper<LongWritable , ??????????????????????Text, NullWritable, NullWritable>{ ?Configuration configuration = null; ?HTable xTable = null; ?static long count = 0; ?? ?@Override ?protected void cleanup(Context context) throws IOException,InterruptedException { ?// TODO Auto-generated method stub ?super.cleanup(context); ?xTable.flushCommits(); ?xTable.close(); ?} ?? ?@Override ?protected void map(LongWritable key, Text value, Context context) ??????????????????????????????throws IOException, InterruptedException { ?String all[] = value.toString().split("/t"); ?Put put = new Put(Bytes.toBytes(all[0])); ?put.add(Bytes.toBytes("colfam1"),Bytes.toBytes("value1"), null); ?xTable.put(put); ?if ((++count % 100)==0) { ?context.setStatus(count +" DOCUMENTS done!"); ?context.progress(); ?System.out.println(count +" DOCUMENTS done!"); ?} ?} ?@Override ?protected void setup(Context context) throws IOException,InterruptedException { ?// TODO Auto-generated method stub ?super.setup(context); ?configuration = context.getConfiguration(); ?xTable = new HTable(configuration,"testtable2"); ?xTable.setAutoFlush(false); ?xTable.setWriteBufferSize(12*1024*1024); ?} } @Override public int run(String[] args) throws Exception { ?String input = args[0]; ?Configuration conf = HBaseConfiguration.create(getConf()); ?conf.set("hbase.master", "node1:60000"); ?Job job = new Job(conf,JOBNAME); ?job.setJarByClass(HBaseImportByMapReduce.class); ?job.setMapperClass(Map.class); ?job.setNumReduceTasks(0); ?job.setInputFormatClass(TextInputFormat.class); ?TextInputFormat.setInputPaths(job, input); ?job.setOutputFormatClass(NullOutputFormat.class); ?return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws IOException { ?Configuration conf = new Configuration(); ?String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); ?int res = 1; ?try { ?res = ToolRunner.run(conf, new HBaseImportByMapReduce(), otherArgs); ?} catch (Exception e) { ?e.printStackTrace(); ?} ?System.exit(res); } } |
清單 8 所示的 MapReduce 方式,啟動任務(wù)需要一些時間,如果數(shù)據(jù)量較大,整個 Map 過程也會消耗較多時間。
其實一般來說 MapReduce 方式和后面要介紹的 Bulk Load 方式是配合使用的,MapReduce 負責(zé)生成 HFile 文件,Bulk Load 負責(zé)導(dǎo)入 HBase。
Bulk Load方式
總的來說,使用 Bulk Load 方式由于利用了 HBase 的數(shù)據(jù)信息是按照特定格式存儲在 HDFS 里的這一特性,直接在 HDFS 中生成持久化的 HFile 數(shù)據(jù)格式文件,然后完成巨量數(shù)據(jù)快速入庫的操作,配合 MapReduce 完成這樣的操作,不占用 Region 資源,不會產(chǎn)生巨量的寫入 I/O,所以需要較少的 CPU 和網(wǎng)絡(luò)資源。Bulk Load 的實現(xiàn)原理是通過一個 MapReduce Job 來實現(xiàn)的,通過 Job 直接生成一個 HBase 的內(nèi)部 HFile 格式文件,用來形成一個特殊的 HBase 數(shù)據(jù)表,然后直接將數(shù)據(jù)文件加載到運行的集群中。使用 Bulk Load 功能最簡單的方式就是使用 ImportTsv 工具,ImportTsv 是 HBase 的一個內(nèi)置工具,目的是從 TSV 文件直接加載內(nèi)容至 HBase。它通過運行一個 MapReduce Job, 將數(shù)據(jù)從 TSV 文件中直接寫入 HBase 的表或者寫入一個 HBase 的自有格式數(shù)據(jù)文件。
ImportTsv 本身是一個在 HBase 的 JAR 文件中的 Java 類,使用 ImportTsv 工具,首先創(chuàng)建一個數(shù)據(jù)文件,如清單 9 所示,我們創(chuàng)建了一個 data.tsv 文件,包含 4 條數(shù)據(jù)。
清單 9.data.tsv
| 1 2 3 4 5 | [root@node3 zhoumingyao]# vi data.tsv 1001 name1 17 00000000001 1002 name2 16 00000000002 1003 name3 16 00000000003 1004 name4 16 00000000004 |
由于 ImportTsv 工具只支持從 HDFS 中讀取數(shù)據(jù),所以一開始我們需要將 TSV 文件從本地文件系統(tǒng)拷貝到 HDFS 中,接下來我們在 HDFS 里新建文件夾后上傳 data.tsv 文件到該文件夾,由于讀和寫的操作是在多臺服務(wù)器上并行執(zhí)行,所以相比從單臺節(jié)點讀取速度快很多。需要指定輸出 (-Dimporttsv.bulk.output), 否則默認會采用 HBase API 方式插入數(shù)據(jù)。代碼如清單 10 所示。
清單 10. 調(diào)用 ImportTsv
| 1 2 3 4 5 6 7 | $HADOOP_HOME/bin/hadoop fs -mkdir /user/test 創(chuàng)建數(shù)據(jù)表 create 'student', {NAME => 'info'} 調(diào)用 importtsv 命令導(dǎo)入數(shù)據(jù), $HADOOP_HOME/bin/hadoop jar /usr/lib/cdh/hbase/hbase-0.94.15-hdh4.6.0.jar ??????????importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:phone ??????????????????????-Dimporttsv.bulk.output=/user/test/output/ student /user/test/data.tsv |
記住需要啟動 YARN,否則會報錯,如清單 11 所示。
清單 11. 錯誤提示
| 1 2 3 | 15/08/21 13:41:27 INFO ipc.Client: Retrying connect to ??????????????server: node1/172.10.201.62:18040. Already tried 0 time(s); ????retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) |
ImportTsv 工具默認使用了 HBase 的 Put API 來將數(shù)據(jù)插入 HBase 表中,在 Map 階段使用的是 TableOutputFormat。但是當(dāng)-Dimporttsv.bulk. 輸入選項被指定時,會使用 HFileOutputFormat 來代替在 HDFS 中生成 HBase 的自有格式文件(HFile)。而后我們能夠使用 completebulkload 來加載生成的文件到一個運行的集群中。根據(jù)清單 12 可以使用 bulk 輸出以及加載工具。
清單 12. 調(diào)用 completebulkload
| 1 2 3 4 5 6 7 8 9 | 創(chuàng)建生成文件的文件夾: $HADOOP_HOME/bin/hadoop fs -mkdir /user/hac/output 開始導(dǎo)入數(shù)據(jù): $HADOOP_HOME/bin/hadoop jar /usr/lib/cdh/hbase/hbase-0.94.15-hdh4.6.0.jar ??importtsv -Dimporttsv.bulk.output=/user/hac/output/2-1 -Dimporttsv.columns= ????????????HBASE_ROW_KEY,info:name,info:age,info:phone student /user/hac/input/2-1 完成 bulk load 導(dǎo)入 $HADOOP_HOME/bin/hadoop jar /usr/lib/cdh/hbase/hbase-0.94.15-hdh4.6.0.jar ??????????????completebulkload /user/hac/output/2-1 student |
Completebulkload 工具讀取生成的文件,判斷它們歸屬的 Resgion Server 族群,然后訪問適當(dāng)?shù)淖迦悍?wù)器。族群服務(wù)器會將 HFile 文件轉(zhuǎn)移進自身存儲目錄中,并且為客戶端建立在線數(shù)據(jù)。
HBase 說明文檔里面記載,Bulk Load 方法分為兩個主要步驟:
1. 使用 HFileOutputFormat 類通過一個 MapReduce 任務(wù)方式生成 HBase 的數(shù)據(jù)文件,就是英文稱為“StoreFiles”的數(shù)據(jù)文件。由于輸出的時候按照 HBase 內(nèi)部的存儲格式來輸出數(shù)據(jù),所以后面讀入 HBase 集群的時候就非常高效了。為了保證高效性,HFileOutputFormat 借助 configureIncrementalLoad 函數(shù),基于當(dāng)前 Table 的各 Region 邊界自動匹配 MapReduce 的分區(qū)類 TotalOrderPartitioner,這樣每一個輸出的 HFile 都會是在一個單獨的 Region 里面的。
為了實現(xiàn)這樣的設(shè)計,所有任務(wù)的輸出都需要使用 Hadoop 的 TotalOrderPartitioner 類去對輸出進行分區(qū),按照 Regions 的主鍵范圍進行分區(qū)。HFileOutputFormat 類包含了一個快捷方法,即 configureIncrementalLoad(),它自動基于數(shù)據(jù)表的當(dāng)前 region 間隔生成一個 TotalOrderPartitioner。
2. 完成數(shù)據(jù)載入到 HBase。當(dāng)所有的數(shù)據(jù)都被用 HFileOutputFormat 方式準(zhǔn)備好以后,我們可以使用 completebulkload 讀入到集群。這個命令行工具迭代循環(huán)數(shù)據(jù)文件,對于每一個數(shù)據(jù)文件迅速找到屬于它的 region,然后 Region 服務(wù)器會讀入這些 HFile。如果在生成文件的過程當(dāng)中 region 被修改了,那 completebulkload 工具會自動切分數(shù)據(jù)文件到新的區(qū)域,這個過程需要花費一些時間。如果數(shù)據(jù)表 (此處是 mytable) 不存在,工具會自動創(chuàng)建該數(shù)據(jù)表。
如清單 13 所示,我們也調(diào)用方法直接載入 HFile 文件到 HBase,采用 Bulk Load 方式完成這個實驗。
清單 13.Bulk Load 方式載入 HFile
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; ?? public class loadIncrementalHFileToHBase { ?? ?public static void main(String[] args) throws Exception { ?Configuration conf = HBaseConfiguration.create(); ?HBaseHelper helper = HBaseHelper.getHelper(conf); ?helper.dropTable("testtable2"); ?helper.createTable("testtable2", "colfam1"); ?HTable table = new HTable("testtable2"); ?LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); ?loader.doBulkLoad(new Path(args[0]), table); ?} ?? } |
特別提醒:
1. 一定記得建 HBase 數(shù)據(jù)表時做 Region 的預(yù)切分,HFileOutputFormat.configureIncrementalLoad 方法會根據(jù) Region 的數(shù)量來決定 Reduce 的數(shù)量以及每個 Reduce 覆蓋的 RowKey 范圍,否則單個 Reduce 過大,容易造成任務(wù)處理不均衡。造成這個的原因是,創(chuàng)建 HBase 表的時候, 默認只有一個 Region, 只有等到這個 Region 的大小超過一定的閾值之后, 才會進行 split,所以為了利用完全分布式加快生成 HFile 和導(dǎo)入 HBase 中以及數(shù)據(jù)負載均衡, 我們需要在創(chuàng)建表的時候預(yù)先進行分區(qū), 而進行分區(qū)時要利用 startKey 與 endKey 進行 rowKey 區(qū)間劃分 (因為導(dǎo)入 HBase 中, 需要 rowKey 整體有序)。解決方法是在數(shù)據(jù)導(dǎo)入之前, 自己先寫一個 MapReduce 的 Job 求最小與最大的 rowKey,即 startKey 與 endKey。
2. 單個 RowKey 下的子列不要過多,否則在 reduce 階段排序的時候會造成內(nèi)存溢出異常,有一種辦法是通過二次排序來避免 reduce 階段的排序,這個解決方案需要視具體應(yīng)用而定。
Sqoop 方法
Sqoop 是 Apache 頂級項目,主要用于在 Hadoop(Hive) 與傳統(tǒng)的數(shù)據(jù)庫 (mysql、postgresql 等等) 之間進行數(shù)據(jù)的傳遞,可以將一個關(guān)系型數(shù)據(jù)庫,例如 MySQL,Oracle,Postgres 等中的數(shù)據(jù)導(dǎo)入到 Hadoop 的 HDFS 中,也可以將 HDFS 的數(shù)據(jù)導(dǎo)進到關(guān)系型數(shù)據(jù)庫中。Sqoop 支持多種導(dǎo)入方式,包括指定列導(dǎo)入,指定格式導(dǎo)入,支持增量導(dǎo)入(有更新才導(dǎo)入)等等。Sqoop 的一個特點就是可以通過 Hadoop 的 MapReduce 把數(shù)據(jù)從關(guān)系型數(shù)據(jù)庫中導(dǎo)入數(shù)據(jù)到 HDFS。
Sqoop 的架構(gòu)較為簡單,通過整合 Hive,實現(xiàn) SQL 方式的操作,通過整合 HBase,可以向 HBase 寫入數(shù)據(jù),通過整合 Oozie,擁有了任務(wù)流的概念。而 Sqoop 本身是通過 MapReduce 機制來保證傳輸數(shù)據(jù),從而提供并發(fā)特性和容錯機制,系統(tǒng)架構(gòu)圖如圖 1 所示,來源 Apache 官方網(wǎng)站。
圖 1.Sqoop 系統(tǒng)架構(gòu)圖
在使用上,Sqoop 對外提供了一組操作命令,只需要簡單配置就可以進行數(shù)據(jù)的轉(zhuǎn)移。
首先配置 Sqoop,如清單 14 所示,對/etc/profile 文件添加兩行,然后執(zhí)行命令。
清單 14. 配置 Sqoop
| 1 2 3 | export SQOOP_HOME=/home/zhoumingyao/sqoop2-1.99.3-cdh5.0.1 export PATH = $SQOOP_HOME/bin:$PATH source /etc/profile |
我們這次做的實驗使用了 Sqoop 的 import 功能,用于將 Oracle 中的人員信息導(dǎo)入到 HBase。在 Hadoop 和 HBase 正常運行的環(huán)境里,我們首先需要配置好 Sqoop,然后調(diào)用如下的命令即可將 Oracle 中的表導(dǎo)入到 HBase 中,代碼如清單 15 所示。
清單 15.Sqoop 導(dǎo)入 Oracle 數(shù)據(jù)到 HBase
| 1 2 3 4 5 6 7 8 9 10 11 12 | sqoop import ?--connect jdbc:oracle:thin:@172.7.27.225:1521:testzmy //JDBC URL ?--username SYSTEM //Oracle username(必須大寫) ?--password hik123456 //Oracle password ?--query 'SELECT RYID, HZCZRK_JBXXB.ZPID, HZCZRK_JBXXB.GMSFHM, HZCZRK_JBXXB.XM, HZCZRK_JBXXB.XB, ?HZCZRK_JBXXB.CSRQ, HZCZRK_ZPXXB.ZP AS ZP FROM HZCZRK_JBXXB ?JOIN HZCZRK_ZPXXB USING(RYID) WHERE $CONDITIONS' // Oracle 數(shù)據(jù),Sqoop 支持多表 query ?--split-by RYID //指定并行處理切分任務(wù)的列名,通常為主鍵 --map-column-java ZP=String //ZP 為 LONG RAW 類型,sqoop 不支持,需要映射成 String ?--hbase-table TESTHZ //HBase 中的 Table ?--column-family INFO //HBase 中的 column-family |
清單 15 所示代碼從兩張數(shù)據(jù)表 HZCZRK_JBXXB 和 HZCZRK_ZPXXB 讀取數(shù)據(jù)并寫入到 HBase 數(shù)據(jù)表 TESTHZ,該數(shù)據(jù)表有一個列祖 INFO。我們在 VMWare CentOS5.6 單節(jié)點偽分布式環(huán)境下進行了測試。測試結(jié)果顯示,單表 HZCZRK_ZPXXB 導(dǎo)入 90962 條數(shù)據(jù)耗時約 27 分鐘,兩表 HZCZRK_JBXXB 和 HZCZRK_ZPXXB JOIN 導(dǎo)入 90962 條數(shù)據(jù)耗時約 50 分鐘。
該實驗顯示 Sqoop 使用過程中的局限性:
1. Import 中進行多表 query 的方式效率會受到影響;
2. 不支持從數(shù)據(jù)庫的視圖導(dǎo)出數(shù)據(jù);
3. 不支持 BLOB、RAW 等大數(shù)據(jù)塊類型直接導(dǎo)入到 HBase,需要通過--map-column-java 將對應(yīng)的列映射成 Java 的基本類型 String 來處理;
4. 每次 import 只能導(dǎo)入到 HBase 的一個 column family。
總的來說,Sqoop 類似于其他 ETL 工具,使用元數(shù)據(jù)模型來判斷數(shù)據(jù)類型并在數(shù)據(jù)從數(shù)據(jù)源轉(zhuǎn)移到 Hadoop 時確保類型安全的數(shù)據(jù)處理。Sqoop 專為大數(shù)據(jù)批量傳輸設(shè)計,能夠分割數(shù)據(jù)集并創(chuàng)建 Hadoop 任務(wù)來處理每個區(qū)塊。
除了上面介紹的 4 種方法的實現(xiàn),我這里還想多提一些關(guān)于數(shù)據(jù)分布、合并的注意事項。HBase 數(shù)據(jù)庫不適用于經(jīng)常更新的應(yīng)用場景,寫操作很頻繁的任務(wù)可能引起的另一個問題是將數(shù)據(jù)寫入了單一的族群服務(wù)器 (Region Server),這種情況經(jīng)常出現(xiàn)在將海量數(shù)據(jù)導(dǎo)入到一個新建的 HBase 數(shù)據(jù)庫中時。一旦數(shù)據(jù)集中在相同的服務(wù)器上,整個集群就變得不平衡,并且寫速度會顯著的降低。
結(jié)束語
數(shù)據(jù)導(dǎo)入環(huán)節(jié)屬于大數(shù)據(jù)應(yīng)用的數(shù)據(jù)清洗部分,需要嘗試多種方式將數(shù)據(jù)導(dǎo)入進去,沒有哪一種方法是唯一的選擇,我們首先要根據(jù)用戶的實際環(huán)境選擇正確的方式。總的來說,Bulk Load 方式是最快速的,我們可以優(yōu)先選擇它。
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/articles/9181251.html
總結(jié)
以上是生活随笔為你收集整理的HBase 数据导入功能实现方式解释的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用 Spring Boot 快速构建
- 下一篇: Hook原理