日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

storm入门——本地模式helloworld

發(fā)布時(shí)間:2023/12/19 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 storm入门——本地模式helloworld 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

創(chuàng)建maven項(xiàng)目,在pom.xml中加入以下配置:

<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><type>jar</type><version>0.9.3-rc1</version></dependency>

創(chuàng)建SimpleSpout類用于獲取數(shù)據(jù)流:

1 package com.hirain.storm.helloworld; 2 3 import java.util.Map; 4 import java.util.Random; 5 6 import backtype.storm.spout.SpoutOutputCollector; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.OutputFieldsDeclarer; 9 import backtype.storm.topology.base.BaseRichSpout; 10 import backtype.storm.tuple.Fields; 11 import backtype.storm.tuple.Values; 12 13 public class SimpleSpout extends BaseRichSpout{ 14 15 /** 16 * 17 */ 18 private static final long serialVersionUID = 1L; 19 20 //用來(lái)發(fā)射數(shù)據(jù)的工具類 21 private SpoutOutputCollector collector; 22 23 private static String[] info = new String[]{ 24 "comaple\t,12424,44w46,654,12424,44w46,654,", 25 "lisi\t,435435,6537,12424,44w46,654,", 26 "lipeng\t,45735,6757,12424,44w46,654,", 27 "hujintao\t,45735,6757,12424,44w46,654,", 28 "jiangmin\t,23545,6457,2455,7576,qr44453", 29 "beijing\t,435435,6537,12424,44w46,654,", 30 "xiaoming\t,46654,8579,w3675,85877,077998,", 31 "xiaozhang\t,9789,788,97978,656,345235,09889,", 32 "ceo\t,46654,8579,w3675,85877,077998,", 33 "cto\t,46654,8579,w3675,85877,077998,", 34 "zhansan\t,46654,8579,w3675,85877,077998,"}; 35 36 Random random=new Random(); 37 38 39 /** 40 * 在SpoutTracker類中被調(diào)用,每調(diào)用一次就可以向storm集群中發(fā)射一條數(shù)據(jù)(一個(gè)tuple元組),該方法會(huì)被不停的調(diào)用 41 */ 42 public void nextTuple() { 43 try { 44 String msg = info[random.nextInt(11)]; 45 // 調(diào)用發(fā)射方法 46 collector.emit(new Values(msg)); 47 // 模擬等待100ms 48 Thread.sleep(100); 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } 52 } 53 /** 54 * 初始化collector 55 */ 56 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 57 this.collector = collector; 58 59 } 60 61 62 /** 63 * 定義字段id,該id在簡(jiǎn)單模式下沒(méi)有用處,但在按照字段分組的模式下有很大的用處。 64 * 該declarer變量有很大作用,我們還可以調(diào)用declarer.declareStream();來(lái)定義stramId,該id可以用來(lái)定義更加復(fù)雜的流拓?fù)浣Y(jié)構(gòu) 65 */ 66 public void declareOutputFields(OutputFieldsDeclarer declarer) { 67 declarer.declare(new Fields("source")); //collector.emit(new Values(msg));參數(shù)要對(duì)應(yīng) 68 } 69 70 }

創(chuàng)建SimpleBolt類,用于處理數(shù)據(jù):

1 package com.hirain.storm.helloworld; 2 3 import backtype.storm.topology.BasicOutputCollector; 4 import backtype.storm.topology.OutputFieldsDeclarer; 5 import backtype.storm.topology.base.BaseBasicBolt; 6 import backtype.storm.tuple.Fields; 7 import backtype.storm.tuple.Tuple; 8 import backtype.storm.tuple.Values; 9 10 11 12 public class SimpleBolt extends BaseBasicBolt { 13 14 /** 15 * 16 */ 17 private static final long serialVersionUID = 1L; 18 19 public void execute(Tuple input,BasicOutputCollector collector) { 20 try { 21 String msg = input.getString(0); 22 if (msg != null){ 23 //System.out.println("msg="+msg); 24 collector.emit(new Values(msg + "msg is processed!")); 25 } 26 27 } catch (Exception e) { 28 e.printStackTrace(); 29 } 30 31 } 32 33 public void declareOutputFields( 34 OutputFieldsDeclarer declarer) { 35 declarer.declare(new Fields("info")); 36 37 } 38 39 }

創(chuàng)建main方法配置storm的topology并啟動(dòng)本地模式運(yùn)行:

1 package com.hirain.storm.helloworld; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.StormSubmitter; 6 import backtype.storm.topology.TopologyBuilder; 7 8 public class SimpleTopology { 9 10 11 public static void main(String[] args) { 12 try { 13 // 實(shí)例化TopologyBuilder類。 14 TopologyBuilder topologyBuilder = new TopologyBuilder(); 15 // 設(shè)置噴發(fā)節(jié)點(diǎn)并分配并發(fā)數(shù),該并發(fā)數(shù)將會(huì)控制該對(duì)象在集群中的線程數(shù)。 16 topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1); 17 // 設(shè)置數(shù)據(jù)處理節(jié)點(diǎn)并分配并發(fā)數(shù)。指定該節(jié)點(diǎn)接收噴發(fā)節(jié)點(diǎn)的策略為隨機(jī)方式。 18 topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout"); 19 Config config = new Config(); 20 config.setDebug(true); 21 if (args != null && args.length > 0) { 22 config.setNumWorkers(1); 23 StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); 24 } else { 25 // 這里是本地模式下運(yùn)行的啟動(dòng)代碼。 26 config.setMaxTaskParallelism(1); 27 LocalCluster cluster = new LocalCluster(); 28 cluster.submitTopology("simple", config, topologyBuilder.createTopology()); 29 } 30 31 } catch (Exception e) { 32 e.printStackTrace(); 33 } 34 } 35 }

以上為storm的簡(jiǎn)單的helloworld,僅供參考

轉(zhuǎn)載于:https://www.cnblogs.com/zhangyukun/p/4031066.html

總結(jié)

以上是生活随笔為你收集整理的storm入门——本地模式helloworld的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。