日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Storm开发

發(fā)布時(shí)間:2023/12/18 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Storm开发 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
以Storm開發(fā)指南中的一個(gè)簡單例子開始

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.base.BaseRichSpout;
import org.apache.log4j.Logger;
import java.util.Map;
import java.util.HashMap;
import java.util.Random;

public class ExclamationTopology {
?? ?
??? public static class TestWordSpout extends BaseRichSpout {
?????? public static Logger LOG = Logger.getLogger(TestWordSpout.class);
?????? boolean _isDistributed;
?????? SpoutOutputCollector _collector;

?????? public TestWordSpout() {
??????????? this(true);
?????? }

?????? public TestWordSpout(boolean isDistributed) {
???????????? _isDistributed = isDistributed;
?????? }
?????? ?
?????? public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
??????????? _collector = collector;
?????? }
?? ?
?????? public void close() {
?????? ?
??????? }
?????? ?
??????? public void nextTuple() {
??????????? Utils.sleep(100);
??????????? final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
??????????? final Random rand = new Random();
??????????? final String word = words[rand.nextInt(words.length)];
??????????? _collector.emit(new Values(word));
??????? }
?? ?
??????? public void ack(Object msgId) {

??????? }

??????? public void fail(Object msgId) {
?????? ?
??????? }
?? ?
??????? public void declareOutputFields(OutputFieldsDeclarer declarer) {
??????????? declarer.declare(new Fields("word"));
??????? }

??????? @Override
??????? public Map<String, Object> getComponentConfiguration() {
??????????? if(!_isDistributed) {
???????????????? Map<String, Object> ret = new HashMap<String, Object>();
???????????????? ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
???????????????? return ret;
??????????? } else {
????????????? return null;
????????? }
??????? }?? ?
????? }

??? public static class ExclamationBolt extends BaseRichBolt {
??????? OutputCollector _collector;

??????? @Override
??????? public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
??????????? _collector = collector;
??????? }

??????? @Override
??????? public void execute(Tuple tuple) {
??????????? _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
??????????? _collector.ack(tuple);
??????? }

??????? @Override
??????? public void declareOutputFields(OutputFieldsDeclarer declarer) {
??????????? declarer.declare(new Fields("word"));
??????? }
??? }
?? ?
??? public static void main(String[] args) throws Exception {
??????? TopologyBuilder builder = new TopologyBuilder();
?????? ?
??????? builder.setSpout("word", new TestWordSpout(), 10);?????? ?
??????? builder.setBolt("exclaim1", new ExclamationBolt(), 3)
??????????????? .shuffleGrouping("word");
??????? builder.setBolt("exclaim2", new ExclamationBolt(), 2)
??????????????? .shuffleGrouping("exclaim1");
?????????????? ?
??????? Config conf = new Config();
??????? conf.setDebug(true);
?????? ?
??????? if(args!=null && args.length > 0) {
??????????? conf.setNumWorkers(3);
?????????? ?
??????????? StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
??????? } else {
?????? ?
??????????? LocalCluster cluster = new LocalCluster();
??????????? cluster.submitTopology("test", conf, builder.createTopology());
??????????? Utils.sleep(10000);
??????????? cluster.killTopology("test");
??????????? cluster.shutdown();?? ?
??????? }
??? }
}



幾點(diǎn)說明
1.Spout實(shí)現(xiàn)
? Strom使用元組作為數(shù)據(jù)模型,元組就是一組命名的值,元組中的每個(gè)字段都可以是任何類型的對象。Storm支持所有基本類型,string和byte數(shù)組作為元組字段值。如果要使用自己定義的類型,也只需要為你自己定義的類型實(shí)現(xiàn)并且注冊一個(gè)serializer即可。每個(gè)節(jié)點(diǎn)還必須要為輸出的元組定義字段名稱。
? Spout要么繼承BaseRichSpout要么實(shí)現(xiàn)IRichSpout和IComponent接口,對與實(shí)現(xiàn)來說主要是實(shí)現(xiàn)以下這些函數(shù)(參考TestWordSpout的實(shí)現(xiàn)):
? void open(java.util.Map conf,
????????? TopologyContext context,
????????? SpoutOutputCollector collector)
???? 當(dāng)一個(gè)Supervisor初始化該Spout組件時(shí)調(diào)用,提供Spout運(yùn)行所必需的環(huán)境
???? 參數(shù)
??????? conf - Storm關(guān)于這個(gè)Spout的配置
??????? context - 這個(gè)配置被用來獲取該Spout任務(wù)的信息,包括任務(wù)id,組件id,輸入輸出信息等等
??????? collector - 用來從這個(gè)Spout里發(fā)送元組,元組可以在任何時(shí)間里發(fā)送,包括open和close函數(shù)里。collector是線程安全的,應(yīng)該被作為一
???????? 個(gè)實(shí)例對象保存到Spout對象里

?? void declareOutputFields(OutputFieldsDeclarer declarer)
?? 定義topology里的Stream的schema
?? declarer - 定義輸出stream的ids,輸出的字段,輸出stream是不是直接stream(direct stream)

? java.util.Map<java.lang.String,java.lang.Object> getComponentConfiguration()
??? 定義該組件的配置

?void ?? ?ack(java.lang.Object msgId)
?以msgId消息告訴Storm這個(gè)Spout已經(jīng)成功輸出了該元組
?void ?? ?activate()
?激活Spout,Spout從deactivate模式轉(zhuǎn)化為activate模式,Spout開始調(diào)用nextTuple輸出數(shù)據(jù)。
?void ?? ?close()
?關(guān)閉Spout
?void ?? ?deactivate()
?解除激活Spout,Spout從activate模式轉(zhuǎn)化為deactivate模式,Spout停止調(diào)用nextTuple輸出數(shù)據(jù)
?void ?? ?fail(java.lang.Object msgId)
?以msgId消息告訴Storm這個(gè)Spout輸出該元組失敗,主要用于將該元組重新放回消息隊(duì)列,以在一段時(shí)間后重發(fā)該元組
?? void nextTuple()
?? 調(diào)用該函數(shù)請求Storm發(fā)送元組到Output Collector,這個(gè)函數(shù)不應(yīng)該是阻塞的,當(dāng)沒有元組發(fā)送時(shí),一般調(diào)用sleep,以充分利用CPU

2.Bolt的實(shí)現(xiàn)

最主要的三個(gè)函數(shù)是,其余的關(guān)于組件接口的函數(shù)和Spout的實(shí)現(xiàn)是一樣的,這里就不說了
void prepare(java.util.Map stormConf,
???????????? TopologyContext context,
???????????? OutputCollector collector)
?和Spout的open函數(shù)的作用類似,在Bolt組件初始化的時(shí)候調(diào)用,提供Bolt所必需的環(huán)境

void execute(Tuple input)
??? 處理單個(gè)輸入的元組,元組對象包含了從組件/流/任務(wù)得來的元數(shù)據(jù)。元組的值通過Tuple#getValue訪問,Bolt并不需要馬上處理元組,可以先將數(shù)據(jù)保存在合適的時(shí)間處理。Bolt使用在prepare函數(shù)中得到的OutputCollector對象輸出元組,必須在這個(gè)函數(shù)里面確保使用OutputCollector#ack或者OutputCollector#fail告知Storm已經(jīng)處理成功或者處理失敗,否則Storm將無法確定Spout里元組是否已經(jīng)被處理完成。

void cleanup()
???? 當(dāng)Bolt要關(guān)閉的時(shí)候調(diào)用,但是不能保證該函數(shù)一定可以被調(diào)用,當(dāng)使用kill -9命令殺死工作進(jìn)程時(shí)該函數(shù)就無法調(diào)用,一般用于local mode下清理使用

3.Topology構(gòu)建
? 構(gòu)建相當(dāng)直接,使用TopologyBuilder構(gòu)建,如例子中的main函數(shù)的代碼所示。TopologyBuilder#setSpout設(shè)置Topology的Spout,使用TopologyBuilder#setBolt設(shè)置Topology的Bolt。
? 其中
? public BoltDeclarer setBolt(java.lang.String id,
????????????????????????????? IBasicBolt bolt,
????????????????????????????? java.lang.Number parallelism_hint)
? id-需要消費(fèi)該組件輸出的流的組件用來識別該組件的唯一標(biāo)識
? bolt-該節(jié)點(diǎn)處理數(shù)據(jù)的Bolt
? parallelism_hint-用來執(zhí)行該Bolt的任務(wù)的數(shù)量,每個(gè)任務(wù)會在集群的某個(gè)進(jìn)程的某個(gè)線程里面執(zhí)行
?
? 其中BoltDeclarer中包含了很多元組從一個(gè)節(jié)點(diǎn)怎么映射到另一個(gè)節(jié)點(diǎn)的規(guī)則,例子中的builder.setBolt("exclaim1", new ExclamationBolt(), 3)
??????? .shuffleGrouping("words")表示設(shè)置exclaim1節(jié)點(diǎn)的Bolt為ExclamationBolt,并行度為3,從words節(jié)點(diǎn)到exclaim1節(jié)點(diǎn)使用隨機(jī)散發(fā)規(guī)則。

? public SpoutDeclarer setSpout(java.lang.String id,
????????????????????????????? IRichSpout spout,
????????????????????????????? java.lang.Number parallelism_hint)

? ?
??????? id-需要消費(fèi)該組件輸出的流的組件用來識別該組件的唯一標(biāo)識
??????? spout-Spout類
??????? parallelism_hint-用來執(zhí)行該Bolt的任務(wù)的數(shù)量,每個(gè)任務(wù)會在集群的某個(gè)進(jìn)程的某個(gè)線程里面執(zhí)行

轉(zhuǎn)載于:https://www.cnblogs.com/flyingwhitepig/archive/2012/12/30/5874080.html

總結(jié)

以上是生活随笔為你收集整理的Storm开发的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。