日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【2019春招准备:108.storm(3)】

發布時間:2024/3/24 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【2019春招准备:108.storm(3)】 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

8.DRPC

  • 8.1 RPC(hadoop)

remote procedure call 遠程過程調用
跨網絡(跨越傳輸和應用兩層),跨進程

hadoopRPC
依賴hadoop client的RPC自己包

  • 8.2 storm中本地模式的DRPC編程

DRPC并不是一個storm的特性,可以單獨用,也可以放在一起用(將會很棒–form官網)
DRPC server進行協調:
拿到一個PRC請求,交給一個topology,將產生的結果返回給客戶端。(和PRC的調用流程基本一樣的)

最后是通過id匹配結果
之前不是遠程調用的時候,是直接new一個topologyBuilder;現在是分布式遠程調用,需要用的類是LinearDRPCTopologyBuilder

9.storm整合周邊框架的使用

可以整合hin多的框架

  • 9.1 redis

已經有一些現成的實現類:RedisLookupBolt RedisStoreBolt RedisFilterBolt

還是舉一個wordcount的例子,不過最后輸出到ziboris3:6379的redis上面,通過rdm進行結果查看

代碼見最后LocalWordCountRedisStormTopology.java

  • 9.2 jdbc

ConnectionProvider
JDBCMapper
JDBCInsertBolt(寫入table的bolt)

  • 9.3 hdfs

待更新

  • 9.4 hbase

待更新

  • 9.5 elasticSearch

待更新

=======================================================================
LocalWordCountRedisStormTopology.java

import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.redis.bolt.RedisStoreBolt; import org.apache.storm.redis.common.config.JedisPoolConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.ITuple; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils;import java.util.HashMap; import java.util.Map; import java.util.Random;/*** 使用storm完成詞頻統計功能*/ public class LocalWordCountRedisStormTopology {public static class DataSourceSpout extends BaseRichSpout {private SpoutOutputCollector collector;public static final String[] words=new String[]{"apple","orange","banana","pinaple"};@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}@Overridepublic void nextTuple() {Random random=new Random();String word=words[random.nextInt(words.length)];this.collector.emit(new Values(word));System.out.println("emit:"+word);Utils.sleep(1000);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("line_"));}@Overridepublic void ack(Object msgId) {super.ack(msgId);System.out.println(msgId);}}/*** 對數據進行分割,并發送分隔好的單詞出去*/public static class SplitBolt extends BaseRichBolt {private OutputCollector collector;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}/*** 業務邏輯:* line對其分割,按照“,”** @param input*/@Overridepublic void execute(Tuple input) {String word = input.getStringByField("line_");this.collector.emit(new Values(word));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word_"));}}/*** 詞頻匯總Bolt*/public static class CountBolt extends BaseRichBolt{Map<String,Integer> map=new HashMap<>();private OutputCollector collector;//還會繼續往下面發送,發送給redis@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector=collector;}/*** 業務邏輯:* 1.獲取每一個單詞* 2.對所有單詞進行匯總* 3.輸出* @param input*/@Overridepublic void execute(Tuple input) {String word=input.getStringByField("word_");Integer count=map.get(word);if(count==null){count=1;}else{count++;}map.put(word,count);//添加的時候hashmap會自動覆蓋相同的key的entrythis.collector.emit(new Values(word,map.get(word)));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word_","count_"));}}/*** Mapper**/public static class WordCountStoreMapper implements RedisStoreMapper{private RedisDataTypeDescription description;private final String hashKey="wc";public WordCountStoreMapper(){description=new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH,hashKey);}@Overridepublic RedisDataTypeDescription getDataTypeDescription() {return description;}@Overridepublic String getKeyFromTuple(ITuple iTuple) {return iTuple.getStringByField("word_");}@Overridepublic String getValueFromTuple(ITuple iTuple) {return iTuple.getIntegerByField("count_")+"";}}public static void main(String[] args) {//topo創建TopologyBuilder builder = new TopologyBuilder();builder.setSpout("DataSourceSpout_",new DataSourceSpout());builder.setBolt("SplitBolt_",new SplitBolt()).shuffleGrouping("DataSourceSpout_");builder.setBolt("CountBolt_",new CountBolt()).shuffleGrouping("SplitBolt_");JedisPoolConfig poolConfig=new JedisPoolConfig.Builder().setHost("192.168.200.203").setPort(6379).build();WordCountStoreMapper storeMapper = new WordCountStoreMapper();RedisStoreBolt storeBolt=new RedisStoreBolt(poolConfig,storeMapper);builder.setBolt("RedisStoreBolt_",storeBolt).shuffleGrouping("CountBolt_");//創建本地集群LocalCluster cluster=new LocalCluster();cluster.submitTopology("LocalWordCountStormTopology",new Config(),builder.createTopology());} }

總結

以上是生活随笔為你收集整理的【2019春招准备:108.storm(3)】的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。