strom-1.1.0模拟单词统计功能,Spout编写,Bolt编写,TopologyDriver编写,本地模式运行,集群模式运行,集群模式下看输出结果
統計文本中的單詞出現的頻率,其中文本內容如下:
創建項目
項目結構如下:
創建pom.xml,代碼如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.strom</groupId><artifactId>wordCountStromDemo</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><!--<scope>provided</scope>--><version>1.1.0</version></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka</artifactId><version>1.1.0</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.8.2</artifactId><version>0.8.1</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!--告訴運行的主類是哪個,注意根據自己的情況,下面的包名做相應的修改--><mainClass>cn.toto.strom.wordcount.StormTopologyDriver</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin></plugins></build> </project>注意其中的mainClass配置,根據自己的項目情況,包名要做相應的變化
使用spout讀取數據,其中MyLocalFileSpout的代碼如下:
package cn.toto.strom.wordcount;import org.apache.commons.lang.StringUtils; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields;import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.Map;/*** Created by maoxiangyi on 2016/8/16.*/ public class MyLocalFileSpout extends BaseRichSpout {private SpoutOutputCollector collector;private BufferedReader bufferedReader;//初始化方法public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;try {this.bufferedReader = new BufferedReader(new FileReader(new File("/home/tuzq/software/stormInstallPath/workdir/aaa.txt")));} catch (FileNotFoundException e) {e.printStackTrace();}}//Storm實時計算的特性就是對數據一條一條的處理//while(true){// this.nextTuple()// }public void nextTuple() {//每被調用一次就會發送一條數據出去try {String line = bufferedReader.readLine();if (StringUtils.isNotBlank(line)){List<Object> arrayList = new ArrayList<Object>();arrayList.add(line);collector.emit(arrayList);}} catch (IOException e) {e.printStackTrace();}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("juzi"));} }使用bolt對單詞進行分割,MySplitBolt的代碼如下:
package cn.toto.strom.wordcount;import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;public class MySplitBolt extends BaseBasicBolt {public void execute(Tuple input, BasicOutputCollector collector) {//1、數據如何獲取String juzi = (String)input.getValueByField("juzi");//2、進行切割String[] strings = juzi.split(" ");//3、發送數據for (String word : strings){//Values 對象幫我們生成一個listcollector.emit(new Values(word,1));}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word","num"));} }使用Bolt對單詞進行統計,MyWordCountAndPrintBolt的代碼如下:
package cn.toto.strom.wordcount;import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple;import java.util.HashMap; import java.util.Map;/*** 代碼說明** @author tuzq* @create 2017-06-20 16:50*/ public class MyWordCountAndPrintBolt extends BaseBasicBolt {private Map<String, Integer> wordCountMap = new HashMap<String, Integer>();public void execute(Tuple input, BasicOutputCollector collector) {String word = (String) input.getValueByField("word");Integer num = (Integer) input.getValueByField("num");//1、查看單詞對應的value是否存在Integer integer = wordCountMap.get(word);if (integer == null || integer.intValue() == 0) {wordCountMap.put(word,num);}else {wordCountMap.put(word,integer.intValue() + num);}//2、打印數據System.out.println(wordCountMap);}public void declareOutputFields(OutputFieldsDeclarer declarer) {//todo 不需要定義輸出的字段} }使用TopologyDriver串聯spout和bolt進行運行,代碼如下:
package cn.toto.strom.wordcount;/*** Created by toto on 2017/6/20.*/import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder;/*** 代碼說明** @author tuzq* @create 2017-06-20 16:57*/ public class StormTopologyDriver {public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {//1、準備任務信息TopologyBuilder topologyBuilder = new TopologyBuilder();topologyBuilder.setSpout("mySpout", new MyLocalFileSpout(),1);topologyBuilder.setBolt("bolt1", new MySplitBolt(),4).shuffleGrouping("mySpout");topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt(),2).shuffleGrouping("bolt1");//2、任務提交//提交給誰?提交什么內容?Config config = new Config();config.setNumWorkers(2);StormTopology stormTopology = topologyBuilder.createTopology();//本地模式LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("wordcount", config, stormTopology);//集群模式//StormSubmitter.submitTopology("wordcount1", config, stormTopology);} }如果是集群模式運行,StormTopologyDriver的代碼是:
package cn.toto.strom.wordcount;import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder;/** @author tuzq* @create 2017-06-20 16:57*/ public class StormTopologyDriver {public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {//1、準備任務信息TopologyBuilder topologyBuilder = new TopologyBuilder();//使用2個線程來運行topologyBuilder.setSpout("mySpout", new MyLocalFileSpout(),2);//使用4個線程來運行topologyBuilder.setBolt("bolt1", new MySplitBolt(),4).shuffleGrouping("mySpout");//使用2個線程來運行topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt(),2).shuffleGrouping("bolt1");//2、任務提交//提交給誰?提交什么內容?Config config = new Config();config.setNumWorkers(2);StormTopology stormTopology = topologyBuilder.createTopology();//本地模式//LocalCluster localCluster = new LocalCluster();//localCluster.submitTopology("wordcount", config, stormTopology);//集群模式StormSubmitter.submitTopology("wordcount1", config, stormTopology);} }StormTopologyDriver 的代碼說明:
1.上面有2個worker
2.spout的兩個并行度平均分配在兩個worker上。每個組件的task數量會被平均分配到worker
3.bolt1的4個并行度平均分配在兩個worker上。
4.bolt2的2個并行度平均分配在兩個worker上。
一般將多個并行度中的實例,叫做task,默認情況下,一個bolt的并行度是4,代表了4個task.
本地模式運行
可以直接右鍵Run運行,最終運行的結果如下:
集群模式運行
在idean中對maven項目打包:
由于集群模式下已經有了strom-core-1.1.0XXX.jar,所以在package之前,要修改pom文件,修改storm-core的依賴為(也就是說加上provided,如果是本地模式需要注釋這個):
如果不修改,將會報如下的錯誤:
Exception in thread "main" java.lang.ExceptionInInitializerErrorat org.apache.storm.config$read_storm_config.invoke(config.clj:78)at org.apache.storm.config$fn__908.invoke(config.clj:100)at org.apache.storm.config__init.load(Unknown Source)at org.apache.storm.config__init.<clinit>(Unknown Source)at java.lang.Class.forName0(Native Method)at java.lang.Class.forName(Class.java:348)at clojure.lang.RT.classForName(RT.java:2154)at clojure.lang.RT.classForName(RT.java:2163)at clojure.lang.RT.loadClassForName(RT.java:2182)at clojure.lang.RT.load(RT.java:436)at clojure.lang.RT.load(RT.java:412)at clojure.core$load$fn__5448.invoke(core.clj:5866)at clojure.core$load.doInvoke(core.clj:5865)at clojure.lang.RestFn.invoke(RestFn.java:408)at clojure.core$load_one.invoke(core.clj:5671)at clojure.core$load_lib$fn__5397.invoke(core.clj:5711)at clojure.core$load_lib.doInvoke(core.clj:5710)at clojure.lang.RestFn.applyTo(RestFn.java:142)at clojure.core$apply.invoke(core.clj:632)at clojure.core$load_libs.doInvoke(core.clj:5753)at clojure.lang.RestFn.applyTo(RestFn.java:137)at clojure.core$apply.invoke(core.clj:634)at clojure.core$use.doInvoke(core.clj:5843)at clojure.lang.RestFn.invoke(RestFn.java:408)at org.apache.storm.command.config_value$loading__5340__auto____12276.invoke(config_value.clj:16)at org.apache.storm.command.config_value__init.load(Unknown Source)at org.apache.storm.command.config_value__init.<clinit>(Unknown Source)at java.lang.Class.forName0(Native Method)at java.lang.Class.forName(Class.java:348)at clojure.lang.RT.classForName(RT.java:2154)at clojure.lang.RT.classForName(RT.java:2163)at clojure.lang.RT.loadClassForName(RT.java:2182)at clojure.lang.RT.load(RT.java:436)at clojure.lang.RT.load(RT.java:412)at clojure.core$load$fn__5448.invoke(core.clj:5866)at clojure.core$load.doInvoke(core.clj:5865)at clojure.lang.RestFn.invoke(RestFn.java:408)at clojure.lang.Var.invoke(Var.java:379)at org.apache.storm.command.config_value.<clinit>(Unknown Source) Caused by: java.lang.RuntimeException: java.io.IOException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/home/tuzq/software/stormInstallPath/servers/apache-storm-1.1.0/lib/storm-core-1.1.0.jar!/defaults.yaml, jar:file:/home/tuzq/software/stormInstallPath/workdir/wordCountStromDemo-1.0-SNAPSHOT-jar-with-dependencies.jar!/defaults.yaml]at org.apache.storm.utils.Utils.findAndReadConfigFile(Utils.java:383)at org.apache.storm.utils.Utils.readDefaultConfig(Utils.java:427)at org.apache.storm.utils.Utils.readStormConfig(Utils.java:463)at org.apache.storm.utils.Utils.<clinit>(Utils.java:177)... 39 more Caused by: java.io.IOException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/home/tuzq/software/stormInstallPath/servers/apache-storm-1.1.0/lib/storm-core-1.1.0.jar!/defaults.yaml, jar:file:/home/tuzq/software/stormInstallPath/workdir/wordCountStromDemo-1.0-SNAPSHOT-jar-with-dependencies.jar!/defaults.yaml]at org.apache.storm.utils.Utils.getConfigFileInputStream(Utils.java:409)at org.apache.storm.utils.Utils.findAndReadConfigFile(Utils.java:362)... 42 more注意,如果引入的storm-core的jar包要和集群中的jar包版本是一樣
如果本地部署和集群部署的storm-core版本不一樣,還需要修改代碼中的包名結構,否則將會報錯
接著執行如下:
接著執行下圖的:
進入項目目錄,比如我的:
進入target目錄:
紅框中的jar是帶有其它jar包依賴的jar,上面一個jar是不帶依賴的jar,集群模式運行的時候使用wordCountStromDemo-1.0-SNAPSHOT-jar-with-dependencies.jar來運行
將wordCountStromDemo-1.0-SNAPSHOT-jar-with-dependencies.jar上傳到:/home/tuzq/software/stormInstallPath/workdir,執行以下命令:
[root@hadoop1 workdir]# storm jar wordCountStromDemo-1.0-SNAPSHOT-jar-with-dependencies.jar cn.toto.strom.wordcount.StormTopologyDriver命令說明:
表示通過storm運行wordCountStromDemo-1.0-SNAPSHOT-jar-with-dependencies.jar中的cn.toto.strom.wordcount.StormTopologyDriver
運行效果:
通過UI界面查看一下程序在哪兒運行:瀏覽器地址是http://hadoop1:8080/
點擊進入,查看效果:
查看最后結果打印位置
點擊UI界面中的Blot2
進入日志目錄,查看日志結果:
總結
以上是生活随笔為你收集整理的strom-1.1.0模拟单词统计功能,Spout编写,Bolt编写,TopologyDriver编写,本地模式运行,集群模式运行,集群模式下看输出结果的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Strom集群安裝,Python安裝,S
- 下一篇: Strom程序的并发机制,配置并行度(代