【大数据】Linux下Storm(0.9版本以上)的环境配置和小Demo
一、引言:
在storm發布到0.9.x以后,配置storm將會變得簡單很多,也就是只需要配置zookeeper和storm即可,而不再需要配置zeromq和jzmq,由于網上面的storm配置絕大部分都是0.9以前的storm版本,所以有很多工作是不需要進行的,下面就storm的0.9.5版本在linux環境下進行配置進行詳細解析。
由于配置storm只需要兩個步驟,大大簡化了配置,也是storm團隊做了很大的努力,讓程序員們專注于程序,讓storm配置進行異常簡單,好了,廢話說了不少,下面正式開始講解。
?
二、配置zookeeper
1.打開shell,可以根據自身的習慣設置下載文件的位置信息,使用如下命令進行下載(下載3.4.6版本,此版本位穩定版):
wget?http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
2.下載完成后,使用如下命令進行解壓縮:
tar -zxvf zookeeper-3.4.6.tar.gz
會出現一個名為zookeeper-3.4.6的文件夾
3.進入zookeeper-3.4.6的conf文件夾,復制zoo_sample.cfg,重命名為zoo.cfg
4.修改zoo.cfg的內容,添加的內容如下:
dataDir=/home/leesf/program/zookeeper/data //(注釋:放置數據信息)
dataLogDir=/home/leesf/program/zookeeper/log //(注釋:放置日志信息)
? ? server.1=127.0.0.1:2888:3888 //(注釋:使用本地模式,如果有多個機器,可以進行配置(server.1=xxx.xxx.xxxx:xxxx:xxxx ? ? ? ? ??
//server.2=xxx.xxx.xxx:xxxx:xxxx ....))
5.在shell命令行里進入zookeeper-3.4.6/bin目錄,使用如下命令可以開啟、測試、停止zookeeper服務
./zkServer.sh start //(注釋:開啟服務)
./zkServer.sh status //(注釋:查看狀態)
./zkServer.sh stop //(停止服務)
截圖如下:
三、配置storm
1.下載storm,使用如下命令下載storm文件
wget?http://mirrors.hust.edu.cn/apache/storm/apache-storm-0.9.5/apache-storm-0.9.5.tar.gz
2.進行解壓縮,使用如下命令
tar -zxvf apache-storm-0.9.5.tar.gz
解壓縮后出現文件夾apache-storm-0.9.5
3.修改apache-storm-0.9.5/conf目錄中的storm.yaml文件
添加的內容如下:
# storm.zookeeper.servers:
# - "127.0.0.1"
#
# nimbus.host: "127.0.0.1"
#
# storm.zookeeper.port:2181
#
# storm.local.dir: "/home/leesf/program/storm/data"
#
# supervisor.slots.ports:
# -6700
# -6701
# -6702
# -6703
4.進入到apache-storm-0.9.5/bin目錄下,啟動nimbus、supervisor、ui,使用如下命令進行啟動:
./storm nimbus
./storm supervisor
./storm ui
截圖如下:
5.在瀏覽器中查看storm ui信息,打開瀏覽器輸入127.0.0.1:8080即可查看
截圖如下:
至此,storm的配置就完成了。
下面使用storm的本地模式來運行一個小的Demo,方便各位園友查看storm的運行效果
?
四、Storm Demo示例
storm demo的目錄結構如下
1.spout包,數據發射源
2.bolt包,數據處理節點
3.main包,程序執行入口
4.words.txt,程序資源文件
分為如下幾個步驟:
1.添加源代碼:
1.spout包中包含一個java文件,WordReader.java,具體代碼如下:
package com.leesf.Spout;import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;public class WordReader extends BaseRichSpout {private SpoutOutputCollector collector;private FileReader fileReader;private boolean completed = false;public void ack(Object msgId) {System.out.println("OK:"+msgId);}public void close() {}public void fail(Object msgId) {System.out.println("FAIL:"+msgId);}/*** The only thing that the methods will do It is emit each * file line*/public void nextTuple() {/*** The nextuple it is called forever, so if we have been readed the file* we will wait and then return*/if(completed){try {Thread.sleep(1000);} catch (InterruptedException e) {//Do nothing }return;}String str;//Open the readerBufferedReader reader = new BufferedReader(fileReader);try{//Read all lineswhile((str = reader.readLine()) != null){/*** By each line emmit a new value with the line as a their*/this.collector.emit(new Values(str),str);}}catch(Exception e){throw new RuntimeException("Error reading tuple",e);}finally{completed = true;}}/*** We will create the file and get the collector object*/public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {this.fileReader = new FileReader(conf.get("wordsFile").toString());} catch (FileNotFoundException e) {throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");}this.collector = collector;}/*** Declare the output field "word"*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("line"));} } View Code2.bolt包中包含兩個java文件,WordCounter.java、WordNormalizer.java,具體代碼如下:
WordCounter.java代碼如下:
package com.leesf.Bolt;import java.util.HashMap; import java.util.Map;import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple;public class WordCounter extends BaseBasicBolt {Integer id;String name;Map<String, Integer> counters;/*** At the end of the spout (when the cluster is shutdown* We will show the word counters*/@Overridepublic void cleanup() {System.out.println("-- Word Counter ["+name+"-"+id+"] --");for(Map.Entry<String, Integer> entry : counters.entrySet()){System.out.println(entry.getKey()+": "+entry.getValue());}}/*** On create */@Overridepublic void prepare(Map stormConf, TopologyContext context) {this.counters = new HashMap<String, Integer>();this.name = context.getThisComponentId();this.id = context.getThisTaskId();}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String str = input.getString(0);/*** If the word dosn't exist in the map we will create* this, if not We will add 1 */if(!counters.containsKey(str)){counters.put(str, 1);}else{Integer c = counters.get(str) + 1;counters.put(str, c);}} } View CodeWordNormalizer.java代碼如下:
package com.leesf.Bolt;import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;public class WordNormalizer extends BaseBasicBolt {public void cleanup() {}/*** The bolt will receive the line from the* words file and process it to Normalize this line* * The normalize will be put the words in lower case* and split the line to get all words in this */public void execute(Tuple input, BasicOutputCollector collector) {String sentence = input.getString(0);String[] words = sentence.split(" ");for(String word : words){word = word.trim();if(!word.isEmpty()){word = word.toLowerCase();collector.emit(new Values(word));}}}/*** The bolt will only emit the field "word" */public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));} } View Code3.main包中包含一個java文件,Main.java,具體代碼如下:
package com.leesf.Main; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import com.leesf.Bolt.*; import com.leesf.Spout.*;public class Main {public static void main(String[] args) throws InterruptedException {//Topology definitionTopologyBuilder builder = new TopologyBuilder();builder.setSpout("word-reader",new WordReader());builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");builder.setBolt("word-counter", new WordCounter(),1).fieldsGrouping("word-normalizer", new Fields("word"));//ConfigurationConfig conf = new Config();conf.put("wordsFile", "/home/leesf/code/eclipse/StormDemo/res/words.txt");conf.setDebug(false);//Topology runconf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);LocalCluster cluster = new LocalCluster();cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());Thread.sleep(10000);cluster.shutdown();} } View Code4.資源文件,words.txt,內容如下:
storm test are great is an storm simple application but very powerfull really StOrm is great View Codewords.txt可以放在任何地方,相應的程序中的路徑也要進行修改,保證路徑一致。
2.添加依賴庫
將storm/lib目錄下的所有文件添加到本項目中,截圖如下:
3.運行程序
運行程序,可以得到如下的結果:
至此,關于storm的所有配置就已經完成了,下面可以進行相應的storm的開發了。
總結:storm在發布了0.9b版本以后,其配置工作就變得很簡單,不再需要配置zeromq和jzmq,現在網上面的配置信息絕大部分都是0.9版本以前的,所以配置顯得很累贅,在此記錄此次的配置過程,方便各位園友的同時也方便自己以后再去配置這樣的信息。在配置的過程中有任何問題也歡迎交流,謝謝各位觀看。
參考鏈接:
http://blog.csdn.net/w13770269691/article/details/38982397
?
?
轉載于:https://www.cnblogs.com/leesf456/p/4790098.html
總結
以上是生活随笔為你收集整理的【大数据】Linux下Storm(0.9版本以上)的环境配置和小Demo的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hadoop学习之Combiner
- 下一篇: Linux中如何让进程(或正在运行的程序