日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Storm的StreamID使用样例(版本1.0.2)

發布時間:2025/7/14 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Storm的StreamID使用样例(版本1.0.2) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

隨手嘗試了一下StreamID的的用法。留個筆記。

?

==數據樣例==

{"Address": "小橋鎮小橋中學對面","CityCode": "511300","CountyCode": "511322","EnterpriseCode": "YUNDA","MailNo": "667748320345","Mobile": "183****5451","Name": "王***","ProvCode": "510000","Weight": "39" }

?

==拓撲結構==

?

==程序源碼==

<Spout1>

package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import common.simulate.DataRandom; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;import java.util.Map;public class Spout1 extends BaseRichSpout {private SpoutOutputCollector _collector = null;private DataRandom _dataRandom = null;private int _timeInterval = 1000;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("Stream1", new Fields("json"));declarer.declareStream("Stream2", new Fields("json"));}@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;_dataRandom = DataRandom.getInstance();if (conf.containsKey(Constants.SpoutInterval)) {_timeInterval = Integer.valueOf((String) conf.get(Constants.SpoutInterval));}}@Overridepublic void nextTuple() {try {Thread.sleep(_timeInterval);} catch (InterruptedException e) {e.printStackTrace();}JSONObject jsonObject = _dataRandom.getRandomExpressData();System.out.print("[---Spout1---]jsonObject=" + jsonObject + "\n");_collector.emit("Stream1", new Values(jsonObject.toJSONString()));_collector.emit("Stream2", new Values(jsonObject.toJSONString()));} }

?

<CountBolt1>

package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.HashMap; import java.util.Map;public class CountBolt1 extends BaseRichBolt {private OutputCollector _collector = null;private int taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("Stream3", new Fields("company", "count"));}@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {_collector = collector;taskId = context.getThisTaskId();}@Overridepublic void execute(Tuple input) {String str = input.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String company = jsonObject.getString(Constants.EnterpriseCode);int count = 0;if (_map.containsKey(company)) {count = _map.get(company);}count++;_map.put(company, count);_collector.emit("Stream3", new Values(company, count));System.out.print("[---CountBolt1---]" +"taskId=" + taskId + ", company=" + company + ", count=" + count + "\n");} }

?

<CountBolt2>

package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.HashMap; import java.util.Map; import java.util.UUID;public class CountBolt2 extends BaseRichBolt {private OutputCollector _collector = null;private int _taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {_collector = outputCollector;_taskId = topologyContext.getThisTaskId();}@Overridepublic void execute(Tuple tuple) {String str = tuple.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String prov = jsonObject.getString(Constants.ProvCode);int count = 0;if (_map.containsKey(prov)) {count = _map.get(prov);}count++;_map.put(prov, count);_collector.emit("Stream4", new Values(prov, count, UUID.randomUUID()));System.out.print("[---CountBolt2---]" +"taskId=" + _taskId + ", prov=" + prov + ", count=" + count + "\n");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declareStream("Stream4", new Fields("prov", "count", "random"));} }

?

<CountBolt3>

package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.HashMap; import java.util.Map; import java.util.UUID;public class CountBolt3 extends BaseRichBolt {private OutputCollector _collector = null;private int _taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {_collector = outputCollector;_taskId = topologyContext.getThisTaskId();}@Overridepublic void execute(Tuple tuple) {String str = tuple.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String city = jsonObject.getString(Constants.CityCode);int count = 0;if (_map.containsKey(city)) {count = _map.get(city);}count++;_map.put(city, count);_collector.emit("Stream4", new Values(city, count));System.out.print("[---CountBolt3---]" +"taskId=" + _taskId + ", city=" + city + ", count=" + count + "\n");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declareStream("Stream4", new Fields("city", "count"));} }

?

<TopBolt>

package test;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple;import java.util.List; import java.util.Map;public class TopBolt extends BaseRichBolt {@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {}@Overridepublic void execute(Tuple tuple) {System.out.print("[---TopBolt---]StreamID=" + tuple.getSourceStreamId() + "\n");List<Object> values = tuple.getValues();for(Object value : values) {System.out.print("[---TopBolt---]value=" + value + "\n");}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {} }

?

<TestTopology>

package test;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;public class TestTopology {public static void main(String[] args)throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("Spout1", new Spout1());builder.setBolt("Count1", new CountBolt1()).shuffleGrouping("Spout1", "Stream1");builder.setBolt("Count2", new CountBolt2()).shuffleGrouping("Spout1", "Stream2");builder.setBolt("Count3", new CountBolt3()).shuffleGrouping("Spout1", "Stream2");builder.setBolt("Top", new TopBolt()).fieldsGrouping("Count1", "Stream3", new Fields("company")).fieldsGrouping("Count2", "Stream4", new Fields("prov")).fieldsGrouping("Count3", "Stream4", new Fields("city"));Config config = new Config();config.setNumWorkers(1);config.put(common.constants.Constants.SpoutInterval, args[1]);if (Boolean.valueOf(args[0])) {StormSubmitter.submitTopology("TestTopology1", config, builder.createTopology());} else {LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("TestTopology1", config, builder.createTopology());}} }

?

==結果日志==

[---Spout1---]jsonObject={"CityCode":"511300","CountyCode":"511322","Address":"小橋鎮小橋中學對面","MailNo":"667748320345","ProvCode":"510000","Mobile":"183****5451","EnterpriseCode":"YUNDA","Weight":"39","Name":"王***"} [---CountBolt1---]taskId=1, company=YUNDA, count=1 [---CountBolt3---]taskId=3, city=511300, count=1 [---CountBolt2---]taskId=2, prov=510000, count=1 [---TopBolt---]StreamID=Stream4 [---TopBolt---]value=510000 [---TopBolt---]value=1 [---TopBolt---]value=99bd1cdb-d5c1-4ac8-b1a1-a4cfffb5a616 [---TopBolt---]StreamID=Stream4 [---TopBolt---]value=511300 [---TopBolt---]value=1 [---TopBolt---]StreamID=Stream3 [---TopBolt---]value=YUNDA [---TopBolt---]value=1

?

轉載于:https://www.cnblogs.com/quchunhui/p/8302192.html

總結

以上是生活随笔為你收集整理的Storm的StreamID使用样例(版本1.0.2)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: xxxxwwww在线观看 | 91av视频在线观看 | www.久久爱 | 午夜精品久久久久久久99老熟妇 | 狠狠操天天干 | 国产免费看片 | 骚虎视频最新网址 | 国产高清在线视频观看 | 中文字幕日韩一区二区三区不卡 | 国产在线观看网站 | 浴室里强摁做开腿呻吟男男 | 免费观看黄色av | 亚洲精品视频免费在线观看 | 夜夜嗨网站 | 奇米影视一区二区三区 | 国产欧美日 | 网站av在线 | 海角社区登录 | 白石茉莉奈中文字幕在 | 农村一级毛片 | 一本色道久久亚洲综合精品蜜桃 | 香蕉视频在线观看黄 | av大片免费在线观看 | 欧美日韩有码 | 亚洲熟妇毛茸茸 | japanese强行粗暴 | 日韩中文字幕一区 | 亚洲欧洲中文 | 欧美一性一乱一交一视频 | 狠狠夜夜| 手机看片欧美日韩 | 亚洲色图第三页 | 一本久道久久综合无码中文 | 久久久999国产精品 天堂av中文在线 | 青青草国产精品 | 青青操视频在线观看 | 伊人性视频 | 日韩一级av毛片 | 亚洲av无码乱码在线观看富二代 | 久久免费看视频 | 欧美午夜激情视频 | 欧美视频在线观看 | 亚洲交性网 | 一本一道波多野结衣av黑人 | 国产高潮又爽又无遮挡又免费 | 亚洲国产中文字幕在线 | 爆操巨乳美女 | 国产成人区 | 91网在线观看 | 青青草视频在线免费观看 | 欧美一区亚洲一区 | 亚洲视频h| 久久国产福利一区 | 国家队动漫免费观看在线观看晨光 | 久久精品国产av一区二区三区 | 毛片.com| 中文字幕在线官网 | 午夜久久久 | 国产精品一二三区 | 四虎国产成人永久精品免费 | 青娱乐极品在线 | 欧美激情电影一区二区 | 日韩亚洲精品在线 | 精品久久国产 | 97超碰碰| 农村妇女毛片精品久久久 | 泰坦尼克号3小时49分的观看方法 | 反差在线观看免费版全集完整版 | 青娱乐免费在线视频 | 欧美色偷偷 | 可以免费观看av的网站 | 乌克兰毛片 | 亚洲va天堂va国产va久 | 久久久久久久久免费 | 国产一区二区av在线 | 日本h片在线观看 | 亚洲av成人无码久久精品老人 | 曰批女人视频在线观看 | 免费特级黄毛片 | 欧美日韩一区电影 | 开心激情婷婷 | 美女的诞生免费观看在线高清 | 国产精品高清网站 | 国内精品久久久久久 | 婷婷影音 | 日韩不卡在线观看 | 国产美女久久 | 亚洲图片一区二区三区 | 2024av| 国产一级啪啪 | 丁香花电影高清在线阅读免费 | 日本不卡视频在线播放 | 精品国产乱码久久久久久久 | 狠狠干av | av免费观看网站 | 污污的视频软件 | 亚洲h| 欧美精品一区二区三区三州 | 午夜精品视频在线 |