日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

storm hook的使用

發布時間:2024/1/23 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 storm hook的使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

storm hook的使用

@(STORM)[storm]

  • storm hook的使用
    • 一原理
    • 二入門例子
    • 三hook的類型
    • 四應用場景

(一)原理

1、先看一下storm的hook是什么東西: http://storm.apache.org/documentation/Hooks.html

Storm provides hooks with which you can insert custom code to run on any number of events within Storm. You create a hook by extending the BaseTaskHook class and overriding the appropriate method for the event you want to catch. There are two ways to register your hook:
In the open method of your spout or prepare method of your bolt using the TopologyContext method.
Through the Storm configuration using the “topology.auto.task.hooks” config. These hooks are automatically registered in every spout or bolt, and are useful for doing things like integrating with a custom monitoring system.

2、storm的hook也是一個典型的鉤子,當某些事情發生時(比如說執行execute方法,執行ack方法時),相應的代碼會自動被調用。

3、通過繼承BaseTaskHook,并覆蓋其方法來創建一個hook。

4、如何把hook注冊到一個拓撲中,有2種方法:
(1)在spout的open方法或者bolt的prepare方法中調用:
TopologyContext.addTaskHook(new **Hook())
(2)在storm的配置文件中修改topology.auto.task.hooks項,這會自己注冊到每一個spout和bolt。這種情況對于一些集成應用或者監控之類的有用。

(二)入門例子

1、先創建hook
這個hook很簡單,就是當execute或者ack方法被調用時,將相應的信息打印出來:

package com.lujinhong.demo.storm.hook;import backtype.storm.hooks.BaseTaskHook; import backtype.storm.hooks.info.BoltAckInfo; import backtype.storm.hooks.info.BoltExecuteInfo;public class TraceTaskHook extends BaseTaskHook {@Overridepublic void boltExecute(BoltExecuteInfo info) {super.boltExecute(info);System.out.println("executingTaskId:" + info.executingTaskId);System.out.println("executedLatencyMs:" + info.executeLatencyMs);System.out.println("execute msg:" + info.tuple.getString(0));}@Overridepublic void boltAck(BoltAckInfo info) {super.boltAck(info);System.out.println("ackingTaskId:" + info.ackingTaskId);System.out.println("processLatencyMs:" + info.processLatencyMs);System.out.println("ack msg:" + info.tuple.getString(0));}}

2、創建topo,并且在topo中注冊鉤子

(1)拓撲很簡單,就是storm-starter中的ExlamationTopoloty,它的spout會隨機發送名字,然后經過2個bolt,每個bolt均在后面加!!!,最后10S后將拓撲kill掉。
(2)然后在bolt中定義hook:

@Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector = collector;context.addTaskHook(new TraceTaskHook()); }

完整代碼如下:

package com.lujinhong.demo.storm.hook;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.TestWordSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils;import java.util.Map;public class ExclamationTopology {public static class ExclamationBolt extends BaseRichBolt {OutputCollector _collector;@Overridepublic void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector = collector;context.addTaskHook(new TraceTaskHook());}@Overridepublic void execute(Tuple tuple) {_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));_collector.ack(tuple);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("word", new TestWordSpout(), 10);builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");Config conf = new Config();conf.setDebug(true);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());}else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("test", conf, builder.createTopology());Utils.sleep(10000);cluster.killTopology("test");cluster.shutdown();}} }

3、運行結果
類似于以下的內容

ackingTaskId:5 processLatencyMs:null ack msg:nathan!!! executingTaskId:5 executedLatencyMs:null execute msg:nathan!!!

(三)hook的類型

storm在BaseTaskHook中支持的類型可見下面的類定義。一般而言,就是在topo發生某項事件(如發射,確認,cleanup等)執行一些操作

public class BaseTaskHook implements ITaskHook {@Overridepublic void prepare(Map conf, TopologyContext context) {}@Overridepublic void cleanup() {} @Overridepublic void emit(EmitInfo info) {}@Overridepublic void spoutAck(SpoutAckInfo info) {}@Overridepublic void spoutFail(SpoutFailInfo info) {}@Overridepublic void boltAck(BoltAckInfo info) {}@Overridepublic void boltFail(BoltFailInfo info) {}@Overridepublic void boltExecute(BoltExecuteInfo info) {} }

(四)應用場景

1、可以用于調試代碼,比如確認某個方法(execute, prepare等)是否被調用

2、用于將topo中的一些信息發送出去,可以作為監控,日志等。

總結

以上是生活随笔為你收集整理的storm hook的使用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。