日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

storm编程指南

發布時間:2024/1/23 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 storm编程指南 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

storm編程指南

@(STORM)[storm, 大數據]

  • storm編程指南
    • 一創建spout
    • 二創建split-bolt
    • 三創建wordcount-bolt
    • 四創建report-bolt
    • 五創建topo
    • 六一些說明
      • 1關于分布式編程的一點說明
      • 2關于storm的classpath
    • 七異常處理
      • 1NoClassDefFoundError
      • 2Unsupported majorminor version 510
      • 3Connection failed Netty-Client-gdc-supervisor02-stormineasenet

本文介紹了storm的基本編程,關于trident的編程,請見???

本示例使用storm運行經典的wordcount程序,拓撲如下:
sentence-spout—>split-bolt—>count-bolt—>report-bolt
分別完成句子的產生、拆分出單詞、單詞數量統計、統計結果輸出
完整代碼請見 https://github.com/jinhong-lu/stormdemo
以下是關鍵代碼的分析。

(一)創建spout

public class SentenceSpout extends BaseRichSpout {private SpoutOutputCollector collector;private int index = 0;private String[] sentences = { "when i was young i'd listen to the radio","waiting for my favorite songs", "when they played i'd sing along","it make me smile","those were such happy times and not so long ago","how i wondered where they'd gone","but they're back again just like a long lost friend","all the songs i love so well", "every shalala every wo'wo","still shines.", "every shing-a-ling-a-ling","that they're starting", "to sing so fine"};public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {this.collector = collector;}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("sentence"));}public void nextTuple() {this.collector.emit(new Values(sentences[index]));index++;if (index >= sentences.length) {index = 0;}try {Thread.sleep(1);} catch (InterruptedException e) {//e.printStackTrace();}} }

上述類中,將string數組中內容逐行發送出去,主要的方法有:

(1)open()方法完成spout的初始化工作,與bolt的prepare()方法類似

(2)declareOutputFileds()定義了發送內容的字段名稱與字段數量,bolt中的方法名稱一樣。

(3)nextTuple()方法是對每一個需要處理的數據均會執行的操作,也bolt的executor()方法類似。它是整個邏輯處理的核心,通過emit()方法將數據發送到拓撲中的下一個節點。

(二)創建split-bolt

public class SplitSentenceBolt extends BaseRichBolt{private OutputCollector collector;public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {this.collector = collector;}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}public void execute(Tuple input) {String sentence = input.getStringByField("sentence");String[] words = sentence.split(" ");for(String word : words){this.collector.emit(new Values(word));//System.out.println(word);}} }

三個方法的含義與spout類似,這個類根據空格把收到的句子進行拆分,拆成一個一個的單詞,然后把單詞逐個發送出去。

input.getStringByField(“sentence”)可以根據上一節點發送的關鍵字獲取到相應的內容。

(三)創建wordcount-bolt

public class WordCountBolt extends BaseRichBolt{private OutputCollector collector;private Map<String,Long> counts = null;public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {this.collector = collector;this.counts = new HashMap<String, Long>();}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word","count"));}public void execute(Tuple input) {String word = input.getStringByField("word");Long count = this.counts.get(word);if(count == null){count = 0L;}count++;this.counts.put(word, count);this.collector.emit(new Values(word,count));//System.out.println(count);} }

本類將接收到的word進行數量統計,并把結果發送出去。
這個bolt發送了2個filed:

declarer.declare(new Fields("word","count"));this.collector.emit(new Values(word,count));

(四)創建report-bolt

public class ReportBolt extends BaseRichBolt{private Map<String, Long> counts;public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {this.counts = new HashMap<String,Long>();}public void declareOutputFields(OutputFieldsDeclarer declarer) {}public void execute(Tuple input) {String word = input.getStringByField("word");Long count = input.getLongByField("count");counts.put(word, count);}public void cleanup() {System.out.println("Final output");Iterator<Entry<String, Long>> iter = counts.entrySet().iterator();while (iter.hasNext()) {Entry<String, Long> entry = iter.next();String word = (String) entry.getKey();Long count = (Long) entry.getValue();System.out.println(word + " : " + count);}super.cleanup();} }

本類將從wordcount-bolt接收到的數據進行輸出。

先將結果放到一個map中,當topo被關閉時,會調用cleanup()方法,此時將map中的內容輸出。

(五)創建topo

public class WordCountTopology {private static final String SENTENCE_SPOUT_ID = "sentence-spout";private static final String SPLIT_BOLT_ID = "split-bolt";private static final String COUNT_BOLT_ID = "count-bolt";private static final String REPORT_BOLT_ID = "report-bolt";private static final String TOPOLOGY_NAME = "word-count-topology";public static void main(String[] args) {SentenceSpout spout = new SentenceSpout();SplitSentenceBolt splitBolt = new SplitSentenceBolt();WordCountBolt countBolt = new WordCountBolt();ReportBolt reportBolt = new ReportBolt();TopologyBuilder builder = new TopologyBuilder();builder.setSpout(SENTENCE_SPOUT_ID, spout);builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID,new Fields("word"));builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);Config conf = new Config();if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology(TOPOLOGY_NAME, conf,builder.createTopology());try {Thread.sleep(10000);} catch (InterruptedException e) {}cluster.killTopology(TOPOLOGY_NAME);cluster.shutdown();} else {try {StormSubmitter.submitTopology(args[0], conf,builder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}}} }

關鍵步驟為:
(1)創建TopologyBuilder,并為這個builder指定spout與bolt

builder.setSpout(SENTENCE_SPOUT_ID, spout);builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID,new Fields("word"));builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);

(2)創建conf對象

Config conf = new Config();

這個對象用于指定一些與拓撲相關的屬性,如并行度、nimbus地址等。
(3)創建并運行拓撲,這里使用了2種方式
一是當沒有參數時,建立一個localcluster,在本地上直接運行,運行10秒后,關閉集群:

LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, conf,builder.createTopology()); Thread.sleep(10000); cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown();

二是有參數是,將拓撲提交到集群中:

StormSubmitter.submitTopology(args[0], conf,builder.createTopology());

第一個參數為拓撲的名稱。

6、本地運行
直接在eclipse中運行即可,輸出結果在console中看到

7、集群運行
(1)編譯并打包

mvn clean compile

(2)把編譯好的jar包上傳到nimbus機器上,然后

storm jar com.ljh.storm.5_stormdemo com.ljh.storm.wordcount.WordCountTopology topology_name

將拓撲提交到集群中。

(六)一些說明

1、關于分布式編程的一點說明

在分布式系統中,由于有多個機器、多個進程在同時運行程序,而它們之間由于運行在不同的JVM中,因此它們之間的變量是無法共享的。

以storm為例:
如果在主程序中設置了某個變量,如

topoName = args[0];

在bolt中想要取得這個變量是不可能的,因為這個變量只保存在了當前的JVM中。
因此,如果在bolt中也要使用這個變量,則必須將其放入一個由分布式系統提供的共享參數中,如:

config.put(Config.TOPOLOGY_NAME, topologyName);

然后,在bolt中的prepare()方法中取得這個參數:

@Override public void prepare(Map conf, TridentOperationContext context) {String topoName = (String) conf.get(Config.TOPOLOGY_NAME); }

其它的分布式系統也類似,切記,不要以為在main函數定義了一個參數,就可以在任何地方使用,它只能在本JVM內使用!!!
一般分布式系統會定義一個context變量,用于傳遞參數。

2、關于storm的classpath

storm作為一個java程序,首先肯定會加載jdk相關的jar包,然后,會查找 $STORM_HOME目錄下的jar包,最后是$STORM_HOME/lib目錄下的jar包。

所以不要以為將jar包移到$STORM_HOME就沒事了,它一樣會被加載。
出現類似NoClassDefFoundError, NoSuchField, IncompatibleClassChangeError: Implementing class等,基本都是由于包重復導致的。

(七)異常處理

1、NoClassDefFoundError

java.lang.NoClassDefFoundError: Could not initialize class com.yammer.metrics.Metrics

其實,noClassDefFound一般都是以下2個原因:
(1)classpath中真的沒有這個包,她就是STORM_HOME/lib下沒有metrics-core-2.2.0.jar包。
(2)classpath中有多個不同版本的metrics包,導致沖突,storm不知道應該用哪個。
但這里2種都不是,繼續google,還有可能是
(1)kafka的broker沒啟動,發現不是
(2)zk沒啟動,就是本機上的zk沒啟動,

2、Unsupported major.minor version 51.0

出現這個異常都是由于編譯時使用的jdk版本和運行環境中的jdk版本不一致。

(1)編譯環境

常用的編譯環境有eclipse, ant, maven等。

使用eclipse的話,改一下java compiler即可。

使用maven的話,修改$MAVEN_HOME/conf/settings.xml中的jdk設置。默認情況下是注釋掉的,把注釋去掉,并且改為1.7版本即可。

(2)storm運行環境

修改$STORM_HOME/conf/storm_evn.ini中的
JAVA_HOME,如
參數,
JAVA_HOME:/usr/lib/jvm/java-7-sun

3、Connection failed Netty-Client-gdc-supervisor02-storm.i.nease.net

原因未明,有空再參考:
https://gist.github.com/amontalenti/8ff0c31a7b95a6dea3d2
http://qnalist.com/questions/5058971/storm-workers-not-starting-because-of-netty-reconnect-info-reconnect-started-for-netty-client
https://m.oschina.net/blog/318756

很多都說是由于hostname的配置有問題,再檢查一下

暫時解決辦法:重啟集群!!!

2015-08-14T09:31:21.171+0800 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-gdc-supervisor02-storm.i.nease.net/123.58.172.120:6706
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.7.0_67]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.7.0_67]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.7.0_67]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.7.0_67]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) ~[na:1.7.0_67]
at org.apache.storm.netty.channel.socket.nio.NioWorker.read(NioWorker.java:64) [storm-core-0.9.4.jar:0.9.4]
at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) [storm-core-0.9.4.jar:0.9.4]
at org.apache.storm.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) [storm-core-0.9.4.jar:0.9.4]
at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) [storm-core-0.9.4.jar:0.9.4]
at org.apache.storm.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [storm-core-0.9.4.jar:0.9.4]
at org.apache.storm.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [storm-core-0.9.4.jar:0.9.4]
at org.apache.storm.netty.util.internal.DeadLockProofWorker1.run(DeadLockProofWorker.java:42)[storm?core?0.9.4.jar:0.9.4]atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)[na:1.7.067]atjava.util.concurrent.ThreadPoolExecutorWorker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]

總結

以上是生活随笔為你收集整理的storm编程指南的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。