storm编程指南
storm編程指南
@(STORM)[storm, 大數據]
- storm編程指南
- 一創建spout
- 二創建split-bolt
- 三創建wordcount-bolt
- 四創建report-bolt
- 五創建topo
- 六一些說明
- 1關于分布式編程的一點說明
- 2關于storm的classpath
- 七異常處理
- 1NoClassDefFoundError
- 2Unsupported majorminor version 510
- 3Connection failed Netty-Client-gdc-supervisor02-stormineasenet
本文介紹了storm的基本編程,關于trident的編程,請見???
本示例使用storm運行經典的wordcount程序,拓撲如下:
sentence-spout—>split-bolt—>count-bolt—>report-bolt
分別完成句子的產生、拆分出單詞、單詞數量統計、統計結果輸出
完整代碼請見 https://github.com/jinhong-lu/stormdemo
以下是關鍵代碼的分析。
(一)創建spout
public class SentenceSpout extends BaseRichSpout {private SpoutOutputCollector collector;private int index = 0;private String[] sentences = { "when i was young i'd listen to the radio","waiting for my favorite songs", "when they played i'd sing along","it make me smile","those were such happy times and not so long ago","how i wondered where they'd gone","but they're back again just like a long lost friend","all the songs i love so well", "every shalala every wo'wo","still shines.", "every shing-a-ling-a-ling","that they're starting", "to sing so fine"};public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {this.collector = collector;}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("sentence"));}public void nextTuple() {this.collector.emit(new Values(sentences[index]));index++;if (index >= sentences.length) {index = 0;}try {Thread.sleep(1);} catch (InterruptedException e) {//e.printStackTrace();}} }上述類中,將string數組中內容逐行發送出去,主要的方法有:
(1)open()方法完成spout的初始化工作,與bolt的prepare()方法類似
(2)declareOutputFileds()定義了發送內容的字段名稱與字段數量,bolt中的方法名稱一樣。
(3)nextTuple()方法是對每一個需要處理的數據均會執行的操作,也bolt的executor()方法類似。它是整個邏輯處理的核心,通過emit()方法將數據發送到拓撲中的下一個節點。
(二)創建split-bolt
public class SplitSentenceBolt extends BaseRichBolt{private OutputCollector collector;public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {this.collector = collector;}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}public void execute(Tuple input) {String sentence = input.getStringByField("sentence");String[] words = sentence.split(" ");for(String word : words){this.collector.emit(new Values(word));//System.out.println(word);}} }三個方法的含義與spout類似,這個類根據空格把收到的句子進行拆分,拆成一個一個的單詞,然后把單詞逐個發送出去。
input.getStringByField(“sentence”)可以根據上一節點發送的關鍵字獲取到相應的內容。
(三)創建wordcount-bolt
public class WordCountBolt extends BaseRichBolt{private OutputCollector collector;private Map<String,Long> counts = null;public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {this.collector = collector;this.counts = new HashMap<String, Long>();}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word","count"));}public void execute(Tuple input) {String word = input.getStringByField("word");Long count = this.counts.get(word);if(count == null){count = 0L;}count++;this.counts.put(word, count);this.collector.emit(new Values(word,count));//System.out.println(count);} }本類將接收到的word進行數量統計,并把結果發送出去。
這個bolt發送了2個filed:
(四)創建report-bolt
public class ReportBolt extends BaseRichBolt{private Map<String, Long> counts;public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {this.counts = new HashMap<String,Long>();}public void declareOutputFields(OutputFieldsDeclarer declarer) {}public void execute(Tuple input) {String word = input.getStringByField("word");Long count = input.getLongByField("count");counts.put(word, count);}public void cleanup() {System.out.println("Final output");Iterator<Entry<String, Long>> iter = counts.entrySet().iterator();while (iter.hasNext()) {Entry<String, Long> entry = iter.next();String word = (String) entry.getKey();Long count = (Long) entry.getValue();System.out.println(word + " : " + count);}super.cleanup();} }本類將從wordcount-bolt接收到的數據進行輸出。
先將結果放到一個map中,當topo被關閉時,會調用cleanup()方法,此時將map中的內容輸出。
(五)創建topo
public class WordCountTopology {private static final String SENTENCE_SPOUT_ID = "sentence-spout";private static final String SPLIT_BOLT_ID = "split-bolt";private static final String COUNT_BOLT_ID = "count-bolt";private static final String REPORT_BOLT_ID = "report-bolt";private static final String TOPOLOGY_NAME = "word-count-topology";public static void main(String[] args) {SentenceSpout spout = new SentenceSpout();SplitSentenceBolt splitBolt = new SplitSentenceBolt();WordCountBolt countBolt = new WordCountBolt();ReportBolt reportBolt = new ReportBolt();TopologyBuilder builder = new TopologyBuilder();builder.setSpout(SENTENCE_SPOUT_ID, spout);builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID,new Fields("word"));builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);Config conf = new Config();if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology(TOPOLOGY_NAME, conf,builder.createTopology());try {Thread.sleep(10000);} catch (InterruptedException e) {}cluster.killTopology(TOPOLOGY_NAME);cluster.shutdown();} else {try {StormSubmitter.submitTopology(args[0], conf,builder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}}} }關鍵步驟為:
(1)創建TopologyBuilder,并為這個builder指定spout與bolt
(2)創建conf對象
Config conf = new Config();這個對象用于指定一些與拓撲相關的屬性,如并行度、nimbus地址等。
(3)創建并運行拓撲,這里使用了2種方式
一是當沒有參數時,建立一個localcluster,在本地上直接運行,運行10秒后,關閉集群:
二是有參數是,將拓撲提交到集群中:
StormSubmitter.submitTopology(args[0], conf,builder.createTopology());第一個參數為拓撲的名稱。
6、本地運行
直接在eclipse中運行即可,輸出結果在console中看到
7、集群運行
(1)編譯并打包
(2)把編譯好的jar包上傳到nimbus機器上,然后
storm jar com.ljh.storm.5_stormdemo com.ljh.storm.wordcount.WordCountTopology topology_name將拓撲提交到集群中。
(六)一些說明
1、關于分布式編程的一點說明
在分布式系統中,由于有多個機器、多個進程在同時運行程序,而它們之間由于運行在不同的JVM中,因此它們之間的變量是無法共享的。
以storm為例:
如果在主程序中設置了某個變量,如
在bolt中想要取得這個變量是不可能的,因為這個變量只保存在了當前的JVM中。
因此,如果在bolt中也要使用這個變量,則必須將其放入一個由分布式系統提供的共享參數中,如:
然后,在bolt中的prepare()方法中取得這個參數:
@Override public void prepare(Map conf, TridentOperationContext context) {String topoName = (String) conf.get(Config.TOPOLOGY_NAME); }其它的分布式系統也類似,切記,不要以為在main函數定義了一個參數,就可以在任何地方使用,它只能在本JVM內使用!!!
一般分布式系統會定義一個context變量,用于傳遞參數。
2、關于storm的classpath
storm作為一個java程序,首先肯定會加載jdk相關的jar包,然后,會查找 $STORM_HOME目錄下的jar包,最后是$STORM_HOME/lib目錄下的jar包。
所以不要以為將jar包移到$STORM_HOME就沒事了,它一樣會被加載。
出現類似NoClassDefFoundError, NoSuchField, IncompatibleClassChangeError: Implementing class等,基本都是由于包重復導致的。
(七)異常處理
1、NoClassDefFoundError
java.lang.NoClassDefFoundError: Could not initialize class com.yammer.metrics.Metrics
其實,noClassDefFound一般都是以下2個原因:
(1)classpath中真的沒有這個包,她就是STORM_HOME/lib下沒有metrics-core-2.2.0.jar包。
(2)classpath中有多個不同版本的metrics包,導致沖突,storm不知道應該用哪個。
但這里2種都不是,繼續google,還有可能是
(1)kafka的broker沒啟動,發現不是
(2)zk沒啟動,就是本機上的zk沒啟動,
2、Unsupported major.minor version 51.0
出現這個異常都是由于編譯時使用的jdk版本和運行環境中的jdk版本不一致。
(1)編譯環境
常用的編譯環境有eclipse, ant, maven等。
使用eclipse的話,改一下java compiler即可。
使用maven的話,修改$MAVEN_HOME/conf/settings.xml中的jdk設置。默認情況下是注釋掉的,把注釋去掉,并且改為1.7版本即可。
(2)storm運行環境
修改$STORM_HOME/conf/storm_evn.ini中的
JAVA_HOME,如
參數,
JAVA_HOME:/usr/lib/jvm/java-7-sun
3、Connection failed Netty-Client-gdc-supervisor02-storm.i.nease.net
原因未明,有空再參考:
https://gist.github.com/amontalenti/8ff0c31a7b95a6dea3d2
http://qnalist.com/questions/5058971/storm-workers-not-starting-because-of-netty-reconnect-info-reconnect-started-for-netty-client
https://m.oschina.net/blog/318756
很多都說是由于hostname的配置有問題,再檢查一下
暫時解決辦法:重啟集群!!!
2015-08-14T09:31:21.171+0800 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-gdc-supervisor02-storm.i.nease.net/123.58.172.120:6706
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.7.0_67]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.7.0_67]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.7.0_67]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.7.0_67]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) ~[na:1.7.0_67]
at org.apache.storm.netty.channel.socket.nio.NioWorker.read(NioWorker.java:64) [storm-core-0.9.4.jar:0.9.4]
at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) [storm-core-0.9.4.jar:0.9.4]
at org.apache.storm.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) [storm-core-0.9.4.jar:0.9.4]
at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) [storm-core-0.9.4.jar:0.9.4]
at org.apache.storm.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [storm-core-0.9.4.jar:0.9.4]
at org.apache.storm.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [storm-core-0.9.4.jar:0.9.4]
at org.apache.storm.netty.util.internal.DeadLockProofWorker1.run(DeadLockProofWorker.java:42)[storm?core?0.9.4.jar:0.9.4]atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)[na:1.7.067]atjava.util.concurrent.ThreadPoolExecutorWorker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
總結
- 上一篇: protocol buffer介绍(pr
- 下一篇: trident原理及编程指南