trident原理及编程指南
trident原理及編程指南
@(STORM)[storm, 大數(shù)據(jù)]
- trident原理及編程指南
- 一理論介紹
- 一trident是什么
- 二trident處理單位
- 三事務(wù)類型
- 1spout類型
- 2state類型
- 3實(shí)現(xiàn)恰好一次的spout與state組合類型
- 二編程指南
- 1定義輸入流
- 2統(tǒng)計(jì)單詞數(shù)量
- 3輸出統(tǒng)計(jì)結(jié)果
- 4split的字義
- 三使用kafka作為數(shù)據(jù)源
- 1定義kafka相關(guān)配置
- 2從kafka中讀取消息并處理
- 3提交拓?fù)?/li>
- 四State示例
- 1主類
- 2Aggregator的用法
- 1Aggregator接口
- 2init方法
- 3aggregate方法
- 4complete方法
- 3state的用法
- 1拓?fù)涠x
- 2工廠類NameSumStateFactory
- 3更新類NameSumUpdater
- 4狀態(tài)類NameSumState
- 4state應(yīng)用思路總結(jié)
一、理論介紹
(一)trident是什么?
Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low latency distributed querying. If you’re familiar with high level batch processing tools like Pig or Cascading, the concepts of Trident will be very familiar – Trident has joins, aggregations, grouping, functions, and filters. In addition to these, Trident adds primitives for doing stateful, incremental processing on top of any database or persistence store. Trident has consistent, exactly-once semantics, so it is easy to reason about Trident topologies.
簡(jiǎn)單的說,trident是storm的更高層次抽象,相對(duì)storm,它主要提供了3個(gè)方面的好處:
(1)提供了更高層次的抽象,將常用的count,sum等封裝成了方法,可以直接調(diào)用,不需要自己實(shí)現(xiàn)。
(2)以批次代替單個(gè)元組,每次處理一個(gè)批次的數(shù)據(jù)。
(3)提供了事務(wù)支持,可以保證數(shù)據(jù)均處理且只處理了一次。
(二)trident處理單位
trident每次處理消息均為batch為單位,即一次處理多個(gè)元組。
(三)事務(wù)類型
關(guān)于事務(wù)類型,有2個(gè)比較容易混淆的概念:spout的事務(wù)類型以及事務(wù)狀態(tài)。
它們都有3種類型,分別為:事務(wù)型、非事務(wù)型和透明事務(wù)型。
1、spout類型
spout的類型指定了由于下游出現(xiàn)問題(fail被調(diào)用,或者超時(shí)無回復(fù))導(dǎo)致元組需要重放時(shí),應(yīng)該怎么發(fā)送元組。
事務(wù)型spout:重放時(shí)能保證同一個(gè)批次發(fā)送同一批元組??梢员WC每一個(gè)元組都被發(fā)送且只發(fā)送一個(gè),且同一個(gè)批次所發(fā)送的元組是一樣的。
非事務(wù)型spout:沒有任何保障,發(fā)完就算。
透明事務(wù)型spout:同一個(gè)批次發(fā)送的元組有可能不同的,它可以保證每一個(gè)元組都被發(fā)送且只發(fā)送一次,但不能保證重放時(shí)同一個(gè)批次的數(shù)據(jù)是一樣的。這對(duì)于部分失效的情況尤其有用,假如以kafka作為spout,當(dāng)一個(gè)topic的某個(gè)分區(qū)失效時(shí),可以用其它分區(qū)的數(shù)據(jù)先形成一個(gè)批次發(fā)送出去,如果是事務(wù)型spout,則必須等待那個(gè)分區(qū)恢復(fù)后才能繼續(xù)發(fā)送。
這三種類型可以分別通過實(shí)現(xiàn)ITransactionalSpout、ITridentSpout、IOpaquePartitionedTridentSpout接口來定義。
2、state類型
state的類型指定了如果將storm的中間輸出或者最終輸出持久化到某個(gè)地方(如內(nèi)存),當(dāng)某個(gè)批次的數(shù)據(jù)重放時(shí)應(yīng)該如果更新狀態(tài)。state對(duì)于下游出現(xiàn)錯(cuò)誤的情況尤其有用。
事務(wù)型狀態(tài):同一批次tuple提供的結(jié)果是相同的。
非事務(wù)型狀態(tài):沒有回滾能力,更新操作是永久的。
透明事務(wù)型狀態(tài):更新操作基于先前的值,這樣由于這批數(shù)據(jù)發(fā)生變化,對(duì)應(yīng)的結(jié)果也會(huì)發(fā)生變化。透明事務(wù)型狀態(tài)除了保存當(dāng)前數(shù)據(jù)外,還要保存上一批數(shù)據(jù),當(dāng)數(shù)據(jù)重放時(shí),可以基于上一批數(shù)據(jù)作更新。
注意,此處的狀態(tài)應(yīng)該是原子性的,比如將狀態(tài)寫入hbase,則應(yīng)該全部寫入,或者全部沒寫入,不能說寫入一半,另一半沒寫入,這連事務(wù)型也無法保證恰好一次了。比如說寫入本地磁盤,就有可能導(dǎo)致這種情況,如果寫到一半出錯(cuò),則無法保證恰好一次了,因?yàn)榇疟P沒有類似于數(shù)據(jù)庫的commit、rollback操作。
3、實(shí)現(xiàn)恰好一次的spout與state組合類型
由上表可以看出:
(1)當(dāng)spout與state均為transcational或者均為opaque時(shí),可以實(shí)現(xiàn)恰好一次。
(2)當(dāng)spout為tansactional,state為opaque時(shí),也可以實(shí)現(xiàn)恰好一次。
(3)但當(dāng)spout為opaque,state為transactional時(shí),不可以實(shí)現(xiàn)恰好一次。因此opaque spout重發(fā)時(shí),它的內(nèi)容可能與上一次不同,而state如果在上個(gè)批次已經(jīng)更新過但這個(gè)批次最終fail了,則spout重發(fā)時(shí),會(huì)在已經(jīng)fail掉的批次上更新,而上一個(gè)批次是不應(yīng)該計(jì)算在內(nèi)的。如果state是transactional的,則它同時(shí)保存了上一次狀態(tài)及當(dāng)前狀態(tài),所以可以基于上一次的狀態(tài)作更新,就不會(huì)有這個(gè)問題。
二、編程指南
代碼如下
package org.ljh.tridentdemo;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.Count; import storm.trident.operation.builtin.FilterNull; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Sum; import storm.trident.testing.FixedBatchSpout; import storm.trident.testing.MemoryMapState; import storm.trident.tuple.TridentTuple;public class TridentWordCount {public static class Split extends BaseFunction {@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {String sentence = tuple.getString(0);for (String word : sentence.split(" ")) {collector.emit(new Values(word));}}}public static StormTopology buildTopology(LocalDRPC drpc) {FixedBatchSpout spout =new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),new Values("how many apples can you eat"), new Values("to be or not to be the person"));spout.setCycle(true);//創(chuàng)建拓?fù)鋵?duì)象TridentTopology topology = new TridentTopology();//這個(gè)流程用于統(tǒng)計(jì)單詞數(shù)據(jù),結(jié)果將被保存在wordCounts中TridentState wordCounts =topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(),new Fields("count")).parallelismHint(16);//這個(gè)流程用于查詢上面的統(tǒng)計(jì)結(jié)果topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));return topology.build();}public static void main(String[] args) throws Exception {Config conf = new Config();conf.setMaxSpoutPending(20);if (args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("wordCounter", conf, buildTopology(drpc));for (int i = 0; i < 100; i++) {System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));Thread.sleep(1000);}} else {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));}} }實(shí)例實(shí)現(xiàn)了最基本的wordcount功能,然后將結(jié)果輸出。關(guān)鍵步驟如下:
1、定義輸入流
FixedBatchSpout spout =new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),new Values("how many apples can you eat"), new Values("to be or not to be the person"));spout.setCycle(true);(1)使用FixedBatchSpout創(chuàng)建一個(gè)輸入spout,spout的輸出字段為sentence,每3個(gè)元組作為一個(gè)batch。
(2)數(shù)據(jù)不斷的重復(fù)發(fā)送。
2、統(tǒng)計(jì)單詞數(shù)量
TridentState wordCounts =topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(),new Fields("count")).parallelismHint(16);這個(gè)流程用于統(tǒng)計(jì)單詞數(shù)據(jù),結(jié)果將被保存在wordCounts中。6行代碼的含義分別為:
(1)首先從spout中讀取消息,spout1定義了zookeeper中用于保存這個(gè)拓?fù)涞墓?jié)點(diǎn)名稱。
(2)并行度設(shè)置為16,即16個(gè)線程同時(shí)從spout中讀取消息。
(3)each中的三個(gè)參數(shù)分別為:輸入字段名稱,處理函數(shù),輸出字段名稱。即從字段名稱叫sentence的數(shù)據(jù)流中讀取數(shù)據(jù),然后經(jīng)過new Split()處理后,以word作為字段名發(fā)送出去。其中new Split()后面介紹,它的功能就是將輸入的內(nèi)容以空格為界作了切分。
(4)將字段名稱為word的數(shù)據(jù)流作分組,即相同值的放在一組。
(5)將已經(jīng)分好組的數(shù)據(jù)作統(tǒng)計(jì),結(jié)果放到MemoryMapState,然后以count作為字段名稱將結(jié)果發(fā)送出去。這步驟會(huì)同時(shí)存儲(chǔ)數(shù)據(jù)及狀態(tài),并將返回TridentState對(duì)象。
(6)并行度設(shè)置。
3、輸出統(tǒng)計(jì)結(jié)果
topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));這個(gè)流程從上述的wordCounts對(duì)象中讀取結(jié)果,并返回。6行代碼的含義分別為:
(1)等待一個(gè)drpc調(diào)用,從drpc服務(wù)器中接受words的調(diào)用來提供消息。調(diào)用代碼如下:
drpc.execute("words", "cat the dog jumped")(2)輸入為上述調(diào)用中提供的參數(shù),經(jīng)過Split()后,以word作為字段名稱發(fā)送出去。
(3)以word的值作分組。
(4)從wordCounts對(duì)象中查詢結(jié)果。4個(gè)參數(shù)分別代表:數(shù)據(jù)來源,輸入數(shù)據(jù),內(nèi)置方法(用于從map中根據(jù)key來查找value),輸出名稱。
(5)過濾掉空的查詢結(jié)果,如本例中,cat和dog都沒有結(jié)果。
(6)將結(jié)果作統(tǒng)計(jì),并以sum作為字段名稱發(fā)送出去,這也是DRPC調(diào)用所返回的結(jié)果。如果沒有這一行,最后的輸出結(jié)果
加上這一行后,結(jié)果為:
DRPC RESULT: [[180]]4、split的字義
public static class Split extends BaseFunction {@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {String sentence = tuple.getString(0);for (String word : sentence.split(" ")) {collector.emit(new Values(word));}} }注意它最后會(huì)發(fā)送數(shù)據(jù)。
5、創(chuàng)建并啟動(dòng)拓?fù)?/p> public static void main(String[] args) throws Exception {Config conf = new Config();conf.setMaxSpoutPending(20);if (args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("wordCounter", conf, buildTopology(drpc));for (int i = 0; i < 100; i++) {System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));Thread.sleep(1000);}} else {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));} }
(1)當(dāng)無參數(shù)運(yùn)行時(shí),啟動(dòng)一個(gè)本地的集群,及自已創(chuàng)建一個(gè)drpc對(duì)象來輸入。
(2)當(dāng)有參數(shù)運(yùn)行時(shí),設(shè)置worker數(shù)量為3,然后提交拓?fù)涞郊?#xff0c;并等待遠(yuǎn)程的drpc調(diào)用。
三、使用kafka作為數(shù)據(jù)源
package com.netease.sytopology;import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.util.Arrays;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import storm.kafka.BrokerHosts; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import storm.kafka.trident.OpaqueTridentKafkaSpout; import storm.kafka.trident.TridentKafkaConfig; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.Count; import storm.trident.testing.MemoryMapState; import storm.trident.tuple.TridentTuple; import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;/** 本類完成以下內(nèi)容*/ public class SyTopology {public static final Logger LOG = LoggerFactory.getLogger(SyTopology.class);private final BrokerHosts brokerHosts;public SyTopology(String kafkaZookeeper) {brokerHosts = new ZkHosts(kafkaZookeeper);}public StormTopology buildTopology() {TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "ma30", "storm");kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());// TransactionalTridentKafkaSpout kafkaSpout = new// TransactionalTridentKafkaSpout(kafkaConfig);OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);TridentTopology topology = new TridentTopology();// TridentState wordCounts =topology.newStream("kafka4", kafkaSpout).each(new Fields("str"), new Split(),new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(),new Fields("count")).parallelismHint(16);// .persistentAggregate(new HazelCastStateFactory(), new Count(),// new Fields("aggregates_words")).parallelismHint(2);return topology.build();}public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {String kafkaZk = args[0];SyTopology topology = new SyTopology(kafkaZk);Config config = new Config();config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);String name = args[1];String dockerIp = args[2];config.setNumWorkers(9);config.setMaxTaskParallelism(5);config.put(Config.NIMBUS_HOST, dockerIp);config.put(Config.NIMBUS_THRIFT_PORT, 6627);config.put(Config.STORM_ZOOKEEPER_PORT, 2181);config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(dockerIp));StormSubmitter.submitTopology(name, config, topology.buildTopology());}static class Split extends BaseFunction {public void execute(TridentTuple tuple, TridentCollector collector) {String sentence = tuple.getString(0);for (String word : sentence.split(",")) {try {FileWriter fw = new FileWriter(new File("/home/data/test/ma30/ma30.txt"),true);fw.write(word);fw.flush();fw.close();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}collector.emit(new Values(word));}}} }本例將從kafka中讀取消息,然后對(duì)消息根據(jù)“,”作拆分,并寫入一個(gè)本地文件。
1、定義kafka相關(guān)配置
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "ma30", "storm");kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);其中ma30是訂閱的topic名稱。
2、從kafka中讀取消息并處理
topology.newStream("kafka4", kafkaSpout).each(new Fields("str"), new Split(),new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(),new Fields("count")).parallelismHint(16);(1)指定了數(shù)據(jù)來源,并指定zookeeper中用于保存數(shù)據(jù)的位置,即保存在/transactional/kafka4。
(2)指定處理方法及發(fā)射的字段。
(3)根據(jù)word作分組。
(4)計(jì)數(shù)后將狀態(tài)寫入MemoryMapState
四、State示例
trident通過spout的事務(wù)性與state的事務(wù)處理,保證了恰好一次的語義。這里介紹了如何使用state。
完整代碼請(qǐng)見 https://github.com/lujinhong/tridentdemo
1、主類
主類定義了拓?fù)涞恼w邏輯,這個(gè)拓?fù)渫ㄟ^一個(gè)固定的spout循環(huán)產(chǎn)生數(shù)據(jù),然后統(tǒng)計(jì)消息中每個(gè)名字出現(xiàn)的次數(shù)。
拓?fù)渲邢葘⑾⒅械膬?nèi)容提取出來成name, age, title, tel4個(gè)field,然后通過project只保留name字段供統(tǒng)計(jì),接著按照name分區(qū)后,為每個(gè)分區(qū)進(jìn)行聚合,最后將聚合結(jié)果通過state寫入map中。
storm.trident.Stream Origin_Stream = topology.newStream("tridentStateDemoId", spout).parallelismHint(3).shuffle().parallelismHint(3).each(new Fields("msg"), new Splitfield(),new Fields("name", "age", "title", "tel")).parallelismHint(3).project(new Fields("name")) //其實(shí)沒什么必要,上面就不需要發(fā)射BCD字段,但可以示范一下project的用法.parallelismHint(3).partitionBy(new Fields("name")); //根據(jù)name的值作分區(qū)Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue")).partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());2、Aggregator的用法
這里涉及了一些trident常用的API,但project等相對(duì)容易理解,這里只介紹partitionAggregate的用法。
再看看上面代碼中對(duì)partitionAggregate的使用:
Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue"))第一,三個(gè)參數(shù)分別表示輸入流的名稱與輸出流的名稱。中間的NameCountAggregator是一個(gè)Aggregator的對(duì)象,它定義了如何對(duì)輸入流進(jìn)行聚合。我們看一下它的代碼:
public class NameCountAggregator implements Aggregator<Map<String, Integer>> {private static final long serialVersionUID = -5141558506999420908L;@Overridepublic Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>();}//判斷某個(gè)名字是否已經(jīng)存在于map中,若無,則put,若有,則遞增@Overridepublic void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);}}//將聚合后的結(jié)果emit出去@Overridepublic void complete(Map<String, Integer> map,TridentCollector collector) {if (map.size() > 0) {for(Entry<String, Integer> entry : map.entrySet()){System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue());collector.emit(new Values(entry.getKey(),entry.getValue()));}map.clear();} }@Overridepublic void prepare(Map conf, TridentOperationContext context) {}@Overridepublic void cleanup() {}}(1)Aggregator接口
它實(shí)現(xiàn)了Aggregator接口,這個(gè)接口有3個(gè)方法:
public interface Aggregator<T> extends Operation {T init(Object batchId, TridentCollector collector);void aggregate(T val, TridentTuple tuple, TridentCollector collector);void complete(T val, TridentCollector collector); }init方法:在處理batch之前被調(diào)用。init的返回值是一個(gè)表示聚合狀態(tài)的對(duì)象,該對(duì)象會(huì)被傳遞到aggregate和complete方法。
aggregate方法:為每個(gè)在batch分區(qū)的輸入元組所調(diào)用,更新狀態(tài)
complete方法:當(dāng)batch分區(qū)的所有元組已經(jīng)被aggregate方法處理完后被調(diào)用。
除了實(shí)現(xiàn)Aggregator接口,還可以實(shí)現(xiàn)ReducerAggregator或者CombinerAggregator,它們使用更方便。詳見《從零開始學(xué)storm》或者官方文檔
https://storm.apache.org/documentation/Trident-API-Overview.html
下面我們看一下這3個(gè)方法的實(shí)現(xiàn)。
(2)init方法
@Override public Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>(); }僅初始化了一個(gè)HashMap對(duì)象,這個(gè)對(duì)象會(huì)作為參數(shù)傳給aggregate和complete方法。對(duì)一個(gè)batch只執(zhí)行一次。
(3)aggregate方法
aggregate方法對(duì)于batch內(nèi)的每一個(gè)tuple均執(zhí)行一次。這里將這個(gè)batch內(nèi)的名字出現(xiàn)的次數(shù)放到init方法所初始化的map中。
@Override public void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);} }(4)complete方法
這里在complete將aggregate處理完的結(jié)果發(fā)送出去,實(shí)際上可以在任何地方emit,比如在aggregate里面。
這個(gè)方法對(duì)于一個(gè)batch也只執(zhí)行一次。
3、state的用法
(1)拓?fù)涠x
先看一下主類中如何將結(jié)果寫入state:
partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());它的定義為:
TridentState storm.trident.Stream.partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater)其中的第二個(gè)參數(shù)比較容易理解,就是輸入流的名稱,這里是名字與它出現(xiàn)的個(gè)數(shù)。下面先看一下Facotry。
(2)工廠類:NameSumStateFactory
很簡(jiǎn)單,它實(shí)現(xiàn)了StateFactory,只有一個(gè)方法makeState,返回一個(gè)State類型的對(duì)象。
public class NameSumStateFactory implements StateFactory {private static final long serialVersionUID = 8753337648320982637L;@Overridepublic State makeState(Map arg0, IMetricsContext arg1, int arg2, int arg3) {return new NameSumState(); } }(3)更新類:NameSumUpdater
這個(gè)類繼承自BaseStateUpdater,它的updateState對(duì)batch的內(nèi)容進(jìn)行處理,這里是將batch的內(nèi)容放到一個(gè)map中,然后調(diào)用setBulk方法
public class NameSumUpdater extends BaseStateUpdater<NameSumState> {private static final long serialVersionUID = -6108745529419385248L;public void updateState(NameSumState state, List<TridentTuple> tuples, TridentCollector collector) {Map<String,Integer> map=new HashMap<String,Integer>();for(TridentTuple t: tuples) {map.put(t.getString(0), t.getInteger(1));}state.setBulk(map);} }(4)狀態(tài)類:NameSumState
這是state最核心的類,它實(shí)現(xiàn)了大部分的邏輯。NameSumState實(shí)現(xiàn)了State接口:
public interface State {void beginCommit(Long txid); void commit(Long txid); }分別在提交之前與提交成功的時(shí)候調(diào)用,在這里只打印了一些信息。
另外NameSumState還定義了如何處理NameSumUpdater傳遞的消息:
public void setBulk(Map<String, Integer> map) {// 將新到的tuple累加至map中for (Entry<String, Integer> entry : map.entrySet()) {String key = entry.getKey();if (this.map.containsKey(key)) {this.map.put(key, this.map.get(key) + map.get(key));} else {this.map.put(key, entry.getValue());}}System.out.println("-------");// 將map中的當(dāng)前狀態(tài)打印出來。for (Entry<String, Integer> entry : this.map.entrySet()) {String Key = entry.getKey();Integer Value = entry.getValue();System.out.println(Key + "|" + Value);} }即將NameSumUpdater傳送過來的內(nèi)容寫入一個(gè)HashMap中,并打印出來。
此處將state記錄在一個(gè)HashMap中,如果需要記錄在其它地方,如mysql,則使用jdbc寫入mysql代替下面的map操作即可。
事實(shí)上,這個(gè)操作不一定要在state中執(zhí)行,可以在任何類中,但建議還是在state類中實(shí)現(xiàn)。
4、state應(yīng)用思路總結(jié)
(1)使用state,你不再需要比較事務(wù)id,在數(shù)據(jù)庫中同時(shí)寫入多個(gè)值等內(nèi)容,而是專注于你的邏輯實(shí)現(xiàn)
(2)除了實(shí)現(xiàn)State接口,更常用的是實(shí)現(xiàn)MapState接口,下次補(bǔ)充。
(3)在拓?fù)渲兄付薙tateFactory,這個(gè)工廠類找到相應(yīng)的State類。而Updater則每個(gè)批次均會(huì)調(diào)用它的方法。State中則定義了如何保存數(shù)據(jù),這里將數(shù)據(jù)保存在內(nèi)存中的一個(gè)HashMap,還可以保存在mysql, hbase等等。
(4)trident會(huì)自動(dòng)比較txid的值,如果和當(dāng)前一樣,則不更改狀態(tài),如果是當(dāng)前txid的下一個(gè)值,則更新狀態(tài)。這種邏輯不需要用戶處理。
(5)如果需要實(shí)現(xiàn)透明事務(wù)狀態(tài),則需要保存當(dāng)前值與上一個(gè)值,在update的時(shí)候2個(gè)要同時(shí)處理。即邏輯由自己實(shí)現(xiàn)。在本例子中,大致思路是在NameSumState中創(chuàng)建2個(gè)HashMap,分別對(duì)應(yīng)當(dāng)前與上一個(gè)狀態(tài)的值,而NameSumUpdater每次更新這2個(gè)Map。
總結(jié)
以上是生活随笔為你收集整理的trident原理及编程指南的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。