手机协处理器java,HBase1.x实战:协处理器Java开发实例--ObserverCoprocessor
概述:
HBase日常開發中可能需要根據查詢條件對固定表部分列建立二級索引存到另外一張表,或要求在插入A表的同時,需要同步部分列到B表中,我們就可以通過協處理實現這個需求:
實例:
A表:
表名:student
主鍵:rowID
列族:info,
列:info:name,info:age,info:score,info:adress,info:phoneNumber
B表:
表名:index_student_table
主鍵:name(來自A表)
列族:info,
列:info:age,info:score
RegionObserver協處理器類PutIndexObserver,向A表插入數據的同時,協處理器向B表中name為主鍵,age,score為列插入數據。
代碼實例:
package com.unicom.ljs.hbase125.coprocessor;import java.io.IOException;import java.util.List;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Durability;import org.apache.hadoop.hbase.client.HTableInterface;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;import org.apache.hadoop.hbase.coprocessor.ObserverContext;import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;import org.apache.hadoop.hbase.regionserver.wal.WALEdit;import org.apache.hadoop.hbase.util.Bytes;/*** @author: Created By lujisen* @company ChinaUnicom Software JiNan* @date: 2020-02-02 13:28* @version: v1.0* @description: com.unicom.ljs.hbase125.coprocessor*//*協處理器,獲取put數據,向index_student_table表同步*/public class PutIndexObserver extends BaseRegionObserver{@Overridepublic void postPut(ObserverContext env,Put put, WALEdit edit, Durability durability) throws IOException {/*獲取協處理器需要插入數據的索引表*/HTableInterface table = env.getEnvironment().getTable(TableName.valueOf("index_student_table"));// 獲取值List cellList1 = put.get(Bytes.toBytes("info"), Bytes.toBytes("name"));List cellList2 = put.get(Bytes.toBytes("info"), Bytes.toBytes("age"));List cellList3 = put.get(Bytes.toBytes("info"), Bytes.toBytes("score"));// 以name作為rowkey age,score作為作為info列族下的列名插入索引表for (Cell cell1 : cellList1) {// 列info:name的值作為二級索引表的rowkeyPut indexPut = new Put(CellUtil.cloneValue(cell1));for (Cell cell2 : cellList2) {indexPut.add(Bytes.toBytes("info"), Bytes.toBytes("age"), CellUtil.cloneValue(cell2));}for (Cell cell3 : cellList3) {indexPut.add(Bytes.toBytes("info"), Bytes.toBytes("score"), CellUtil.cloneValue(cell3));}// 插入索引表數據table.put(indexPut);}// 關閉表table.close();}}
數據插入主函數代碼:
package com.unicom.ljs.hbase125.coprocessor;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;import java.util.ArrayList;import java.util.List;/*** @author: Created By lujisen* @company ChinaUnicom Software JiNan* @date: 2020-02-02 15:07* @version: v1.0* @description: com.unicom.ljs.hbase125.coprocessor*/public class StudentCoprocessor{public static Configuration conf =null;public static Connection conn =null;public static final String tableName="student";public static final int insertCount=1;public static void main(String[] args) throws Exception{conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum","salver158.hadoop.unicom,salver31.hadoop.unicom,salver32.hadoop.unicom");conf.set("hbase.zookeeper.property.clientPort", "2181");conf.set("zookeeper.znode.parent", "/hbase-unsecure");conn = ConnectionFactory.createConnection(conf);// 獲取表HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));List insertPutList=new ArrayList();for(int i=0;i
驗證步驟:
1.將代碼打jar包,上傳到hdfs的/tmp目錄下
hadoop fs -put studentCoprocessor.jar /tmp/
2.登錄hbase shell控制臺,新建表student,添加協處理器,依次執行:
create?'student','info'disable?'student'alter'student',METHOD =>'table_att','coprocessor' =>'hdfs://10.124.165.98:8020/tmp/studentCoprocessor.jar|com.unicom.ljs.hbase125.coprocessor.PutIndexObserver|100'enable'student'
3.新建需要同步的數據表index_student_table
create'index_student_table','info'
4.執行“數據插入主函數”,查看表數據進行驗證:
5.表student和表index_student_table都已經插入了數據,驗證完成。
總結
以上是生活随笔為你收集整理的手机协处理器java,HBase1.x实战:协处理器Java开发实例--ObserverCoprocessor的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java 开源 网络流量统计_jpcap
- 下一篇: java美元兑换,(Java实现) 美元