trident State应用指南
trident State應用指南
@(STORM)[storm, 大數據]
- trident State應用指南
- 一State基礎示例
- 1主類
- 2Aggregator的用法
- 1Aggregator接口
- 2init方法
- 3aggregate方法
- 4complete方法
- 3state的用法
- 1拓撲定義
- 2工廠類NameSumStateFactory
- 3更新類NameSumUpdater
- 4狀態類NameSumState
- 4state應用步驟總結
- 5state應用的一些注意事項
- 6state與MapState的差異
- 二MapState
- 1persistentAggregate
- 2MapStates
- 3Demo
- 1創建一個實現IBackingMap的類實現multiGet和multiPut方法
- 2創建實現StateFactory的類
- 3定義Count函數
- 4在拓撲中寫入state或者查詢state
- 4關于MapState的總結
- 1基本步驟
- 2全流程邏輯
- 3復雜的情況
- 4其它思考
- 5MapState讀寫mysql示例
- 1MysqlMapStateFactory
- 2MysqlMapStateBacking
- 三以HBaseMapState為例分析MapState代碼調用全過程
- 零概述 MapState被調用的全流程代碼
- 1調用過程
- TOTO按著這個流程把代碼重頭讀一遍先了解ITridentBatchBolt
- 2內容概述
- 一如何使用MapState
- 二如何實現一個MapStateHBaseMapState源碼分析
- 1Option內部類
- 2Factory內部類
- 1構造函數
- 2makeState方法
- 3構造函數
- 4返回StateFactory的方法
- 5multiGet
- 6multiPut
- 7序列化器
- 三MapState框架
- TODO補充各個類的關系圖參考P323
- 1build方法
- 2構造方法
- 3beginCommit
- TODO CachedBatchReadsMap分析
- 4commit
- 5multiGet
- 6multiPut
- 7multiUpdate
- 四storm如何調用MapState的代碼
- 1GroupedStream類
- 零概述 MapState被調用的全流程代碼
Trident及State的原理請見另一篇文章:http://blog.csdn.net/lujinhong2/article/details/47132305
簡單總結:
1、最簡單的情況使用IBacking的邏輯,很容易實現k-v格式的state。
2、如果IBacking不夠靈活(不能取得txid,不是kv而是多列的格式),則直接實現MapState的接口。
3、最復雜的是使用State接口,最靈活,但真有必要嗎?
第一二種方法比較:persistenceAggregate 第一個參數關鍵定義了如何去更新state(如mysql中的內容),比如先取出數據,更新txid,再寫回去之類的,而第二個參數定義了以什么邏輯去更新數據,如求和、計算、還是平均之類的。 因此,反正第一個參數都只是返回一個MapState對象,那使用IBacking接口還是直接使用MapState接口都可以了,只是前者作了一些txid邏輯的封裝,對應于幾種state的類型,因此使用方便了一點,便事實上,它的代碼是很簡單的,它就是通過判斷txid的關系來定義了update是如何使用get和put的,所以,可以直接實現MapState接口的update方法即可。
一、State基礎示例
trident通過spout的事務性與state的事務處理,保證了恰好一次的語義。這里介紹了如何使用state。
完整代碼請見 https://github.com/lujinhong/tridentdemo
1、主類
主類定義了拓撲的整體邏輯,這個拓撲通過一個固定的spout循環產生數據,然后統計消息中每個名字出現的次數。
拓撲中先將消息中的內容提取出來成name, age, title, tel4個field,然后通過project只保留name字段供統計,接著按照name分區后,為每個分區進行聚合,最后將聚合結果通過state寫入map中。
storm.trident.Stream Origin_Stream = topology.newStream("tridentStateDemoId", spout).parallelismHint(3).shuffle().parallelismHint(3).each(new Fields("msg"), new Splitfield(),new Fields("name", "age", "title", "tel")).parallelismHint(3).project(new Fields("name")) //其實沒什么必要,上面就不需要發射BCD字段,但可以示范一下project的用法.parallelismHint(3).partitionBy(new Fields("name")); //根據name的值作分區Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue")).partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());2、Aggregator的用法
這里涉及了一些trident常用的API,但project等相對容易理解,這里只介紹partitionAggregate的用法。
再看看上面代碼中對partitionAggregate的使用:
Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue"))第一,三個參數分別表示輸入流的名稱與輸出流的名稱。中間的NameCountAggregator是一個Aggregator的對象,它定義了如何對輸入流進行聚合。我們看一下它的代碼:
public class NameCountAggregator implements Aggregator<Map<String, Integer>> {private static final long serialVersionUID = -5141558506999420908L;@Overridepublic Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>();}//判斷某個名字是否已經存在于map中,若無,則put,若有,則遞增@Overridepublic void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);}}//將聚合后的結果emit出去@Overridepublic void complete(Map<String, Integer> map,TridentCollector collector) {if (map.size() > 0) {for(Entry<String, Integer> entry : map.entrySet()){System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue());collector.emit(new Values(entry.getKey(),entry.getValue()));}map.clear();} }@Overridepublic void prepare(Map conf, TridentOperationContext context) {}@Overridepublic void cleanup() {}}(1)Aggregator接口
它實現了Aggregator接口,這個接口有3個方法:
public interface Aggregator<T> extends Operation {T init(Object batchId, TridentCollector collector);void aggregate(T val, TridentTuple tuple, TridentCollector collector);void complete(T val, TridentCollector collector); }init方法:在處理batch之前被調用。init的返回值是一個表示聚合狀態的對象,該對象會被傳遞到aggregate和complete方法。
aggregate方法:為每個在batch分區的輸入元組所調用,更新狀態
complete方法:當batch分區的所有元組已經被aggregate方法處理完后被調用。
除了實現Aggregator接口,還可以實現ReducerAggregator或者CombinerAggregator,它們使用更方便。詳見《從零開始學storm》或者官方文檔
https://storm.apache.org/documentation/Trident-API-Overview.html
下面我們看一下這3個方法的實現。
(2)init方法
@Override public Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>(); }僅初始化了一個HashMap對象,這個對象會作為參數傳給aggregate和complete方法。對一個batch只執行一次。
(3)aggregate方法
aggregate方法對于batch內的每一個tuple均執行一次。這里將這個batch內的名字出現的次數放到init方法所初始化的map中。
@Override public void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);} }(4)complete方法
這里在complete將aggregate處理完的結果發送出去,實際上可以在任何地方emit,比如在aggregate里面。
這個方法對于一個batch也只執行一次。
3、state的用法
(1)拓撲定義
先看一下主類中如何將結果寫入state:
partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());它的定義為:
TridentState storm.trident.Stream.partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater)其中的第二個參數比較容易理解,就是輸入流的名稱,這里是名字與它出現的個數。下面先看一下Facotry。
(2)工廠類:NameSumStateFactory
很簡單,它實現了StateFactory,只有一個方法makeState,返回一個State類型的對象。
public class NameSumStateFactory implements StateFactory {private static final long serialVersionUID = 8753337648320982637L;@Overridepublic State makeState(Map arg0, IMetricsContext arg1, int arg2, int arg3) {return new NameSumState(); } }(3)更新類:NameSumUpdater
這個類繼承自BaseStateUpdater,它的updateState對batch的內容進行處理,這里是將batch的內容放到一個map中,然后調用setBulk方法
public class NameSumUpdater extends BaseStateUpdater<NameSumState> {private static final long serialVersionUID = -6108745529419385248L;public void updateState(NameSumState state, List<TridentTuple> tuples, TridentCollector collector) {Map<String,Integer> map=new HashMap<String,Integer>();for(TridentTuple t: tuples) {map.put(t.getString(0), t.getInteger(1));}state.setBulk(map);} }(4)狀態類:NameSumState
這是state最核心的類,它實現了大部分的邏輯。NameSumState實現了State接口:
public interface State {void beginCommit(Long txid); void commit(Long txid); }分別在提交之前與提交成功的時候調用,在這里只打印了一些信息。
另外NameSumState還定義了如何處理NameSumUpdater傳遞的消息:
public void setBulk(Map<String, Integer> map) {// 將新到的tuple累加至map中for (Entry<String, Integer> entry : map.entrySet()) {String key = entry.getKey();if (this.map.containsKey(key)) {this.map.put(key, this.map.get(key) + map.get(key));} else {this.map.put(key, entry.getValue());}}System.out.println("-------");// 將map中的當前狀態打印出來。for (Entry<String, Integer> entry : this.map.entrySet()) {String Key = entry.getKey();Integer Value = entry.getValue();System.out.println(Key + "|" + Value);} }即將NameSumUpdater傳送過來的內容寫入一個HashMap中,并打印出來。
此處將state記錄在一個HashMap中,如果需要記錄在其它地方,如mysql,則使用jdbc寫入mysql代替下面的map操作即可。
事實上,這個操作不一定要在state中執行,可以在任何類中,但建議還是在state類中實現。
4、state應用步驟總結
partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());state的應用步驟相當簡單,原理也很簡單:
(1)NameSumStateFactory()指定了將結果保存在哪里,如本例中的hashset,還可以是mysql/hbase等。當然還有更新邏輯,
(2)NameSumUpdater()指定了更新state的邏輯,如將當前數據和原有數據相加等。
5、state應用的一些注意事項
(1)使用state,你不再需要比較事務id,在數據庫中同時寫入多個值等內容,而是專注于你的邏輯實現
(2)除了實現State接口,更常用的是實現MapState接口,下次補充。
(3)在拓撲中指定了StateFactory,這個工廠類找到相應的State類。而Updater則每個批次均會調用它的方法。State中則定義了如何保存數據,這里將數據保存在內存中的一個HashMap,還可以保存在mysql, hbase等等。
(4)trident會自動比較txid的值,如果和當前一樣,則不更改狀態,如果是當前txid的下一個值,則更新狀態。這種邏輯不需要用戶處理。
(5)如果需要實現透明事務狀態,則需要保存當前值與上一個值,在update的時候2個要同時處理。即邏輯由自己實現。在本例子中,大致思路是在NameSumState中創建2個HashMap,分別對應當前與上一個狀態的值,而NameSumUpdater每次更新這2個Map。
6、state與MapState的差異
(1)由上面可以看出,state需要自己指定如何更新數據
if (this.map.containsKey(key)) {this.map.put(key, this.map.get(key) + map.get(key));} else {this.map.put(key, entry.getValue());} }這里是將原有的值,加上新到的值。而MapState會根據你選擇的類型(Transactional, Opaque, NonTransactional)定義好邏輯,只要定義如果向state中讀寫數據即可。
(2)MapState將State的aggreate與persistent 2部分操作合在一起了,由方法名也可以看出。在State中最后2步是partitionAggregate()與partitionPersistent(),而在MapState中最后1步是persistentAggregate()
事實上,查看persistentAggregate()的實現,它最終也是分成aggregate和persistent 2個步驟的。
二、MapState
1、persistentAggregate
Trident有另外一種更新State的方法叫做persistentAggregate。如下:
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))persistentAggregate是在partitionPersist之上的另外一層抽象。它知道怎么去使用一個Trident 聚合器來更新State。在這個例子當中,因為這是一個group好的stream,Trident會期待你提供的state是實現了MapState接口的。用來進行group的字段會以key的形式存在于State當中,聚合后的結果會以value的形式存儲在State當中。MapState接口看上去如下所示:
public interface MapState<T> extends State { List<T> multiGet(List<List<Object>> keys); List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters); void multiPut(List<List<Object>> keys, List<T> vals); }當你在一個未經過group的stream上面進行聚合的話,Trident會期待你的state實現Snapshottable接口:
public interface Snapshottable<T> extends State { T get(); T update(ValueUpdater updater); void set(T o); }MemoryMapState 和 MemcachedState 都實現了上面的2個接口。
2、MapStates
在Trident中實現MapState是非常簡單的,它幾乎幫你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 類實現了所有相關的邏輯,包括容錯的邏輯。你只需要將一個IBackingMap 的實現提供給這些類就可以了。IBackingMap接口看上去如下所示:
public interface IBackingMap<T> { List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals); }OpaqueMap’s會用OpaqueValue的value來調用multiPut方法,TransactionalMap’s會提供TransactionalValue中的value,而NonTransactionalMaps只是簡單的把從Topology獲取的object傳遞給multiPut。
Trident還提供了一種CachedMap類來進行自動的LRU cache。
另外,Trident 提供了 SnapshottableMap 類將一個MapState 轉換成一個 Snapshottable 對象.
大家可以看看 MemcachedState的實現,從而學習一下怎樣將這些工具組合在一起形成一個高性能的MapState實現。MemcachedState是允許大家選擇使用opaque transactional, transactional, 還是 non-transactional 語義的。
實現一個MapState,可以實現IBackingMap接口(mutliGet()/multiPut),并且實現StateFactory接口(makeState()),返回一個State對象,這是常見的用法
* 但如果有一引起高級需求,可以直接實現MapState接口,這樣可以覆蓋一些如beginCommit(Long txid);commit(Long txid);這些方法,還有multiUpdate()。*
3、Demo
完整代碼請見 https://github.com/lujinhong/tridentdemo
- 更詳細的可以參考trident-memcached(很全面,但較舊)
https://github.com/nathanmarz/trident-memcached - 或者storm-hbase的State實現等
在Trident中實現MapState是非常簡單的,它和單純的State不同點在于:OpaqueMap, TransactionalMap 和 NonTransactionalMap會實現相關的容錯邏輯,只需為這些類提供一個IBackingMap接口實現,調用multiGet和multiPut方法訪問各自的K/V值。
public interface IBackingMap<T> {List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals); }詳細的步驟如下:
(1)創建一個實現IBackingMap的類,實現multiGet和multiPut方法
主要實現multiGet和multiPut的方法,實現如何從state中讀寫數據。
multiGet 的參數是一個List,可以根據key來查詢數據,key本身也是一個List,以方便多個值組合成key的情形。
multiPut的參數是一個List類型的keys和一個List類型的values,它們的size應該是相等的,把這些值寫入state中。
這里將k/v寫入了一個HashMap中,如果需要寫入mysql,則只需要使用jdbc,把db.put改為寫入mysql即可,查詢類似。
(2)創建實現StateFactory的類
public class MemoryMapStateFacotry implements StateFactory{@Overridepublic State makeState(Map conf, IMetricsContext metrics,int partitionIndex, int numPartitions) {return TransactionalMap.build((IBackingMap<TransactionalValue>) new MemoryMapStateBacking());}}很簡單,就返回一個實現了MapState接口的類對象,通過把上面定義的MemoryMapStateBacking對象傳入TransactionalMap.build作參數即可。當然還可以使用:
NonTransactionalMap.build(state);b OpaqueMap.build(state);(3)定義Count函數
用于說明如果將新來的數據與原來state中的數據組合更新。這里使用了storm提供的一個工具類,它將新來到的值與原有的值相加。
public class Count implements CombinerAggregator<Long> {@Overridepublic Long init(TridentTuple tuple) {return 1L;}@Overridepublic Long combine(Long val1, Long val2) {return val1 + val2;}@Overridepublic Long zero() {return 0L;}}(4)在拓撲中寫入state,或者查詢state
//這個流程用于統計單詞數據,結果將被保存在wordCounts中TridentState wordCounts =topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapStateFacotry(), new Count(),new Fields("count")).parallelismHint(16);//這個流程用于查詢上面的統計結果topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));return topology.build();4、關于MapState的總結
(1)基本步驟
(1)創建一個實現IBackingMap的類,實現multiGet和multiPut方法
(2)創建實現StateFactory的類,它的makeState返回一個實現了MapState接口的對象,可以通過:
mapState = TransactionalMap.build(_iBacking);其中_iBacking就是第一步實現類的對象。當然還可以使用
mapState = NonTransactionalMap.build(state);mapState = OpaqueMap.build(state);TransactionalMap,OpaqueMap, NonTransactionalMap已經通過判斷txid的值實現了相應的事務邏輯,以TransactionalMap為例,它的源碼中會判斷batch中的txid與state中已經存儲的是否相同,或者同的話則新值等于舊值即可:
if(_currTx!=null && _currTx.equals(val.getTxid()) && !retval.cached)
(3)在拓撲中使用persistentAggregate寫入state
(2)全流程邏輯
以事務型狀態為例,我們看一下整個存儲過程的邏輯:
* 首先,persistentAggregate收到一批數據,它的第一個參數返回的是事務型的MapState
* 然后,TransactionalMap在multiUpdate中會判斷這個事務的txid與當前state中的txid是否一致。
* 如果txid一致的話,則保持原來的值即可,如果txid不一致,則更新數值。
* 如果更新數據呢?它是拿新來的值和state中的原有的值,使用persistentAggregate中第2個參數定義的類方法作聚合計算。
* 第一個參數關鍵定義了如何去更新state(如mysql中的內容),比如先取出數據,更新txid,再寫回去之類的,而第二個參數定義了以什么邏輯去更新數據,如求和、計算、還是平均之類的。* 因此,反正第一個參數都只是返回一個MapState對象,那使用IBacking接口還是直接使用MapState接口都可以了,只是前者作了一些txid邏輯的封裝,對應于幾種state的類型,因此使用方便了一點,便事實上,它的代碼是很簡單的,它就是通過判斷txid的關系來定義了update是如何使用get和put的,所以,可以直接實現MapState接口的update方法即可。
persistentAggregate的第2個參數定義了數據是如何更新的,而IBackingMap中的multiGet和multiPut只定義了如何向state中存取數據。
比如此處的Count,它會將將2個數據相加:
因此新來的統計次數與原有的統計次數加起來即是新的總和。
而對于透明事務狀態,不管txid是否一致,都需要修改state,同時將當前state保存一下,成為preState。非事務型就簡單了,不管你來什么,我都直接更新。
(3)復雜的情況
當然,如果覺得TransactionalMap,OpaqueMap, NonTransactionalMap不能滿足業務需求,則可以自定義一個實現了MapState接口的類,而不是直接使用它們。
反正這三個類的實現邏輯非常簡單,當不能滿足業務需要時,看一下源碼,然后參考它創建自己的類即可,此時,關鍵是multiUpdate的實現。
(4)其它思考
key可以是一個很復雜的List,包括多個字段。
5、MapState讀寫mysql示例
(1)MysqlMapStateFactory
public class MysqlMapStateFactory<T> implements StateFactory {private static final long serialVersionUID = 1987523234141L;@Overridepublic State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {return TransactionalMap.build((IBackingMap<TransactionalValue>) new MysqlMapStateBacking());} }很簡單,就一行,返回一個IBacking對象。這里使用的Transactioal,當然還可以使用NonTransactional和Opaque。
(2)MysqlMapStateBacking
最核心的還是multiGet()和multiPut:
@Overridepublic List<TransactionalValue> multiGet(List<List<Object>> keys) {if (stmt == null) {stmt = getStatment();}List<TransactionalValue> values = new ArrayList<TransactionalValue>();for (List<Object> key : keys) {String sql = "SELECT req_count FROM edt_analysis where id='" + key.get(0) + "'";LOG.debug("============sql: " + sql);try (ResultSet rs = stmt.executeQuery(sql)) {if (rs.next()) {LOG.info("Get value:{} by key:{}", rs.getObject(1), key);values.add(derialize(rs.getObject(1)));} else {values.add(null);}} catch (SQLException e) {e.printStackTrace();}}return values;}@Overridepublic void multiPut(List<List<Object>> keys, List<TransactionalValue> vals) {if (stmt == null) {stmt = getStatment();}for (int i = 0; i < keys.size(); i++) {String sql = "replace into edt_analysis values('" + keys.get(i).get(0) + "','" + serialize(vals.get(i))+ "')";LOG.debug("===================put sql " + sql);try {stmt.execute(sql);} catch (SQLException e) {e.printStackTrace();}}}但mysql與redis之類的不同,它需要將一個TransactionalValue對象轉換為mysql中的一行數據,同理,需要將mysql中的一行數據轉換為一個TransactionalValue對象:
// 將數據庫中的varchar轉換為TransactionalValue對象 private TransactionalValue derialize(Object object) {String value[] = object.toString().split(",");return new TransactionalValue(Long.parseLong(value[0]), Long.parseLong(value[1])); }// 將TransactionalValue轉換為String private String serialize(TransactionalValue transactionalValue) {return transactionalValue.getTxid() + "," + transactionalValue.getVal(); }這是使用了最簡單的方式,只有2列,一行是key,一列是value,value中保存了txid及真實的value,之間以逗號分隔。
三、以HBaseMapState為例分析MapState代碼調用全過程
(零)概述 & MapState被調用的全流程代碼
1、調用過程
(1)SubtopologyBolt implements ITridentBatchBolt這個bolt在完成一個batch的處理后會調用finishBatch(BatchInfo batchInfo)
(2)然后調用PartitionPersistProcessor implements TridentProcessor這個處理器的finishBatch(ProcessorContext processorContext)
(3)接著調用MapCombinerAggStateUpdater implements StateUpdater<MapState>的updateState(MapState map, List<TridentTuple> tuples, TridentCollector collector)
(4)再接著調用TransactionalMap<T> implements MapState<T>的 multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters)
(5)最后就是調用用戶定義的MapState類(如HBaseMapState)的multiGet()和multiPut()方法了。
TOTO:按著這個流程把代碼重頭讀一遍,先了解ITridentBatchBolt。
簡單的說就是一個blot被處理完后,會調用finishBatch()方法,然后這個方法會調用MapState()框架的updateState(),接著調用mutliUpdate(),最后調用用戶定義的multiGet()和multiPut()。
2、內容概述
本部分我們將以HBaseMapState為例,介紹使用MapState保證數據完整唯一的全流程代碼調用,主要分成這幾個部分:
(1)我們先介紹用戶如何在構建代碼中使用這個MapState
(2)然后介紹HBaseMapState的源代碼,這也是用戶需要實現一個MapState的基本方法。
(3)接著介紹MapState框架如何調用用戶定義的代碼形成事務性。
(4)最后介紹storm的內部機制,如何調用MapState。
這也是用戶如何要看源碼的逐步深入的過程。
(一)如何使用MapState
詳細DEMO請見:https://github.com/lujinhong/stormhbasedemo
1、指定一些配置
HBaseMapState.Options option = new HBaseMapState.Options();option.tableName = "ljhtest2";option.columnFamily = "f1";option.mapMapper = new SimpleTridentHBaseMapMapper("ms");SimpleTridentHBaseMapMapper主要用于獲取Rowkey和qualifier。Option的完整選項見下面的源碼分析。
2、指定state
topology.newStream("kafka", kafkaSpout).shuffle().each(new Fields("str"), new WordSplit(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(HBaseMapState.transactional(option), new Count(), new Fields("aggregates_words")).parallelismHint(1);這里使用Option對象來構建一個HBaseMapStateFactory。
還可以通過
分別創建非事務與透明的state。
這里使用了storm內建的Count()方法,如果使用Sum,用法如下:
.persistentAggregate(HBaseMapState.transactional(option), new Fields("cash"), new Sum(), new Fields("state")).parallelismHint(1);當然還可以自定義方法,這里自定義方法也就可以自定義保存在hbase的數據類型了。
(二)如何實現一個MapState:HBaseMapState源碼分析
HBaseMapState的主要代碼都在HBaseMapState類中。一個MapState的實現關鍵在于
* 構建一個實現StateFactory的類,實現makeState() 方法,返回一個State對象。
* 一個MapState,實現IBackingMap接口的multiGet()和multiPut(),指定如何從hbase中讀寫數據。
關于mapstate的基礎介紹請參考上面。
1、Option內部類
HBaseMapState有一個內部類:Option,用于指定一些配置項。
public static class Options<T> implements Serializable {public Serializer<T> serializer = null;public int cacheSize = 5000;public String globalKey = "$HBASE_STATE_GLOBAL$";public String configKey = "hbase.config";public String tableName;public String columnFamily;public TridentHBaseMapMapper mapMapper; }分別意思為:
* 序列化器,即以什么格式寫入hbase,storm-hbase自帶了JSON格式的序列化實現。
* 緩沖大小
* 未知
* 指定hbase-site.xml位置的變量
* 表名
* family名
* 用于獲取rowkey和qualifier,創建對象時需要指定一個參數作為qualifier。
2、Factory內部類
(1)構造函數
構造函數接收2個參數,分別為state的類型以及Option對象。
除些以外,還指定了序列化器:
(2)makeState()方法
就是返回一個State對象。
3、構造函數
構造函數用于加載配置文件,安全機制等。
4、返回StateFactory的方法
沒什么好介紹的,就是返回各種類型的staStateFactory,具體的說就是返回上面Factory的一個對象。這里只保留了透明型的。
@SuppressWarnings("rawtypes") public static StateFactory opaque() {Options<OpaqueValue> options = new Options<OpaqueValue>();return opaque(options); }@SuppressWarnings("rawtypes") public static StateFactory opaque(Options<OpaqueValue> opts) {return new Factory(StateType.OPAQUE, opts); }5、multiGet
根據一個List<List<Object>> keys列表獲取到一個返回值的列表List。注意key本身也是一個List<Object>。
代碼主要是三部分:
(1)創建List<Get> gets
(2)查詢hbase:根據gets獲取Result[]
List<T> retval = new ArrayList<T>();Result[] results = this.table.get(gets);(3)將results封裝成一個List<T> retval并返回
for (int i = 0; i < keys.size(); i++) {String qualifier = this.options.mapMapper.qualifier(keys.get(i));Result result = results[i];byte[] value = result.getValue(this.options.columnFamily.getBytes(), qualifier.getBytes());if(value != null) {retval.add(this.serializer.deserialize(value));} else {retval.add(null);}} return retval;當返回值為空時,則加上null。
6、multiPut
它將一個List<List<Object>> keys, List<T> values的數據寫入hbase,注意keys.size()與values.size()必須相等。
List<Put> puts = new ArrayList<Put>(keys.size());for (int i = 0; i < keys.size(); i++) {byte[] hbaseKey = this.options.mapMapper.rowKey(keys.get(i));String qualifier = this.options.mapMapper.qualifier(keys.get(i));LOG.info("Partiton: {}, Key: {}, Value: {}", new Object[]{this.partitionNum, new String(hbaseKey), new String(this.serializer.serialize(values.get(i)))});Put put = new Put(hbaseKey);T val = values.get(i);put.add(this.options.columnFamily.getBytes(),qualifier.getBytes(),this.serializer.serialize(val));puts.add(put);this.table.put(puts);7、序列化器
序列化器指定了以何種格式將數據寫入hbase(序列化),以及取出數據后如何進行解釋(反序列化),即關鍵是serialize()與deserialize()這2個方法。
storm默認提供了json的實現,以Transactional為例:
public class JSONTransactionalSerializer implements Serializer<TransactionalValue>它的內部只有2個方法:
@Override public byte[] serialize(TransactionalValue obj) {List toSer = new ArrayList(2);toSer.add(obj.getTxid());toSer.add(obj.getVal());try {return JSONValue.toJSONString(toSer).getBytes("UTF-8");} catch (UnsupportedEncodingException e) {throw new RuntimeException(e);} }它將一個TransactionalValue轉化為json格式,TransactionalValue只有2個變量,是一個典型的bean:
T val; Long txid;而另一個方法deserialize()則剛好相反,它將一個json格式字節流解釋為一個TransactionalValue對象:
@Override public TransactionalValue deserialize(byte[] b) {try {String s = new String(b, "UTF-8");List deser = (List) JSONValue.parse(s);return new TransactionalValue((Long) deser.get(0), deser.get(1));} catch (UnsupportedEncodingException e) {throw new RuntimeException(e);} }(三)MapState框架
//TODO:補充各個類的關系圖,參考P323
上述介紹了用戶如何通過實現IBackingMap接口來創建自己的MapState實現,這里我們將介紹MapState框架是如何調用用戶寫的mutliGet()和multiPut方法的。
* 另外,如果上述實現iBackingMap的方法不能滿足你的要求,你可以實現自己的MapState框架,按照這里介紹的方法即可 *
我們主要以Transactional為例,再簡單介紹一下NonTransactional和Opaque的情形。在上面的Factory.makeState()方法中:
IBackingMap state = new HBaseMapState(options, conf, partitionIndex); mapState = TransactionalMap.build(state);state就是用戶代碼定義的MapState實現,此此處是HBaseMapState。我們下面看一下TransactionalMap是如何調用HBaseMapState的mutliGet()和multiPut方法的。
1、build()方法
我們從build方法開始,因為這是用戶創建MapState所調用的API。
public static <T> MapState<T> build(IBackingMap<TransactionalValue> backing) {return new TransactionalMap<T>(backing); }它使用用戶定義的IBackingMap對象創建一個MapState對象,主要通過構造方法來實現。
2、構造方法
protected TransactionalMap(IBackingMap<TransactionalValue> backing) {_backing = new CachedBatchReadsMap(backing); }3、beginCommit()
@Override public void beginCommit(Long txid) {_currTx = txid;_backing.reset(); }當開始處理一個事務時,設置當前正在處理的txid,reset()是CachedBatchReadsMap類中清空緩存的方法。
TODO: CachedBatchReadsMap分析
4、commit()
@Override public void commit(Long txid) {_currTx = null;_backing.reset(); }當一個事務處理完成后,將txid設置為null。
5、multiGet
@Override public List<T> multiGet(List<List<Object>> keys) {List<CachedBatchReadsMap.RetVal<TransactionalValue>> vals = _backing.multiGet(keys);List<T> ret = new ArrayList<T>(vals.size());for(CachedBatchReadsMap.RetVal<TransactionalValue> retval: vals) {TransactionalValue v = retval.val;if(v!=null) {ret.add((T) v.getVal());} else {ret.add(null);}}return ret; }通過調用用戶的_backing.multiGet(keys)來實現具體邏輯,作了一些類型轉換。
6、multiPut()
@Override public void multiPut(List<List<Object>> keys, List<T> vals) {List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(vals.size());for(T val: vals) {newVals.add(new TransactionalValue<T>(_currTx, val));}_backing.multiPut(keys, newVals); }同樣只是調用用戶定位的multiPut()。
7、multiUpdate()
核心的邏輯在于這幾行:
if(val==null) {newVal = new TransactionalValue<T>(_currTx, updater.update(null));changed = true;} else {if(_currTx!=null && _currTx.equals(val.getTxid()) && !retval.cached) {newVal = val;} else {newVal = new TransactionalValue<T>(_currTx, updater.update(val.getVal()));changed = true;}}ret.add(newVal.getVal());if(changed) {newVals.add(newVal);newKeys.add(keys.get(i));}}在這之前,先把數據get出來,然后判斷:
- 如果key對應的value為空,則changed為true
- 如果key對應的value不為空,而且當前的txid與value中的txid相同,則changed保持為false。
- 如果key對應的value不為空,但當前的txid與value中的txid不同,則changed為true。
這部分邏輯就是Transactional, NonTransactional和Opaque的差別。
NonTransactional不會判斷txid,只要來一批就更新一次。
Opaque基于之前的值作更新。
(四)storm如何調用MapState的代碼
根據前面的分析,用戶在拓撲定義中通過以下類似的代碼來指定state:
topology.newStream("wordsplit", spout).shuffle().each(new Fields("sentence"), new WordSplit(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(HBaseMapState.transactional(option), new Count(), new Fields("aggregates_words")).parallelismHint(1);主要看第3、4行,先對數據根據”word”這個流進行分組,然后再調用persistentAggregate()方法。再簡單解釋一下這個方法,3個參數分別為:
* 返回一個StateFactory對象,它有一個makeState()方法,返回一個State對象。這個state對象就是用戶定義的MapState,主要定義了如何從state中讀寫數據。
* 第二個參數表示如何對取出的數據進行什么操作,這里使用的是Count,如是其它類,如Sum,則多一個參數:
* 發送的消息流。
好,我們下面開始分析GroupedStream#persistentAggregate()做了什么東西。
1、GroupedStream類
public TridentState persistentAggregate(StateFactory stateFactory, CombinerAggregator agg, Fields functionFields) {return persistentAggregate(new StateSpec(stateFactory), agg, functionFields); }public TridentState persistentAggregate(StateSpec spec, CombinerAggregator agg, Fields functionFields) {return persistentAggregate(spec, null, agg, functionFields); }public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields); }很簡單的代碼邏輯,先使用StateFactory對象創建一個StateSpec對象,然后繼續調用,從第3個方法可以看出,這里還有一個參數是表示inputFields,即輸入的field,即對哪個field執行CombinerAggregator的操作。StateSpec類的定義非常簡單:
public class StateSpec implements Serializable {public StateFactory stateFactory;public Integer requiredNumPartitions = null;public StateSpec(StateFactory stateFactory) {this.stateFactory = stateFactory;} }最終真正調用的方法是這個:
public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {return aggregate(inputFields, agg, functionFields).partitionPersist(spec,TridentUtils.fieldsUnion(_groupFields, functionFields),new MapCombinerAggStateUpdater(agg, _groupFields, functionFields),TridentUtils.fieldsConcat(_groupFields, functionFields)); }這個方法主要分成2個步驟
* 第一個是調用aggregate()方法,主要如何對數據進行操作。這部分我們以后再分析,反正把它理解為一個數據的更新就好了。
* 第二個是調用partitionPersist()方法,如何將數據寫入state。
構建一個ProcessorNode,然后將它添加進_topology中。
總結
以上是生活随笔為你收集整理的trident State应用指南的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: trident API指南
- 下一篇: storm UI解释