日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Storm Trident API

發布時間:2025/4/16 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Storm Trident API 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  在Storm Trident中有五種操作類型

  •   Apply Locally:本地操作,所有操作應用在本地節點數據上,不會產生網絡傳輸??? ?
  •   Repartitioning:數據流重定向,單純的改變數據流向,不會改變數據內容,這部分會有網絡傳輸
  •   Aggragation:聚合操作,會有網絡傳輸
  •   Grouped streams上的操作
  •   Merge和Join

一Apply Locally

  1.functions函數操作

  函數的作用是接收一個tuple(需指定接收tuple的哪個字段),輸出0個或多個tuples。輸出的新字段值會被追加到原始輸入tuple的后面,如果一個function不輸出tuple,那就意味這這個tuple被過濾掉了,例如下面的例子:

1 class AddAndSubFuction extends BaseFunction{ 2 3 public void execute(TridentTuple tuple, TridentCollector collector) { 4 int res1 = tuple.getInteger(0); 5 int res2 = tuple.getInteger(1); 6 int sub = res1 > res2 ? res1 - res2 : res2 - res1; 7 collector.emit(new Values(res1+res2,sub)); 8 } 9 }

?

  2.Filter過濾操作

  Filters很簡單,接收一個tuple,并決定是否保留這個tuple,例如

1 class ScoreFilter extends BaseFilter{ 2 3 public boolean isKeep(TridentTuple tuple) { 4 return tuple.getInteger(0) >= 60; 5 } 6 }

  上述Filter過濾調成績小于60的tuple.

  3.partitionAggregate

  PartitionAggregate的作用對每個Partition中的tuple進行聚合,與前面的函數在原tuple后面追加數據不同,PartitionAggregate的輸出會直接替換掉輸入的tuple,僅數據PartitionAggregate中發射的tuple。

  TridentAPI提供了三個聚合器接口:CombinerAggregator,ReducerAggregator,Aggregator

  我們先來看一看CombinerAggregatorCombinerAggregator接口的定義如下:

public interface CombinerAggregator<T> extends Serializable {T init(TridentTuple tuple);T combine(T val1, T val2);T zero(); }

?

  CombinerAggregator接口只返回一個tuple,并且這個tuple也只包含一個field。init方法會先執行,它負責預處理每一個接收到的tuple,然后再執行combine函數來計算收到的tuples直到最后一個tuple到達,當所有tuple處理完時,CombinerAggregator會發射zero函數的輸出,比如CombinerAggregator的實現類Count的定義如下:

public class Count implements CombinerAggregator<Long> {@Overridepublic Long init(TridentTuple tuple) {return 1L;}@Overridepublic Long combine(Long val1, Long val2) {return val1 + val2;}@Overridepublic Long zero() {return 0L;}}

?

  當你使用aggregate?方法代替PartitionAggregate時,CombinerAggregator的好處就體現出來了,因為Trident會自動優化計算,在網絡傳輸tuples之前做局部聚合。

  我們再來看一下ReducerAggregatorReducerAggregator的定義如下:

public interface ReducerAggregator<T> extends Serializable {T init();T reduce(T curr, TridentTuple tuple); }

?

  ReducerAggregator通過init方法提供一個初始值,然后為輸入的每個tuple迭代這個值,最終產生一個唯一的tuple并輸出,定義一個實例如下:

public class ReducerCount implements ReducerAggregator<Long>{@Overridepublic Long init() {return 0L;}@Overridepublic Long reduce(Long curr, TridentTuple tuple) {return curr + 1;}}

?

  最后看一下通用的聚合器Aggregator,它的定義如下:

public interface Aggregator<T> extends Operation {T init(Object batchId, TridentCollector collector);void aggregate(T val, TridentTuple tuple, TridentCollector collector);void complete(T val, TridentCollector collector); }

  Aggregator接口可以發射含任意數量屬性的任意數據量的tuples,并且可以在執行過程中的任何時候發射:
  init:在處理數據之前被調用,它的返回值會作為一個狀態值傳遞給aggregate和complete方法
  aggregate:用來處理每一個輸入的tuple,它可以更新狀態值也可以發射tuple
  complete:當所有tuple都被處理完成后被調用

  有時候我們需要執行多個聚合器,這在Trident中稱為chaining

  4.projection投影操作

  投影操作的作用是僅僅保留stream指定字段的數據,和關系數據庫中投影的概念類似

二Repartitioning重定向操作

  重定向操作是如何在各個任務間對tuples進行分區。分區的數量也有可能改變重定向的結果。重定向需要網絡傳輸,下面介紹下重定向函數:

  • shuffle:通過隨機分配算法來均衡tuple到各個分區
  • broadcast:每個tuple都被廣播到所有的分區,這種方式在drcp時非常有用,比如在每個分區上做stateQuery
  • partitionBy:根據指定的字段列表進行劃分,具體做法是用指定字段列表的hash值對分區個數做取模運算,確保相同字段列表的數據被劃分到同一個分區
  • global:所有的tuple都被發送到一個分區,這個分區用來處理整個Stream
  • batchGlobal:一個Batch中的所有tuple都被發送到同一個分區,不同的Batch會去往不同的分區
  • Partition:通過一個自定義的分區函數來進行分區,這個自定義函數實現了?backtype.storm.grouping.CustomStreamGrouping
  • ?

    三Aggragation聚合操作

      Trident有aggregate和 persistentAggregate方法來做聚合操作。aggregate是獨立的運行在Stream的每個Batch上的,而persistentAggregate則是運行在Stream的所有Batch上并把運算結果存儲在state source中。 運行aggregate方法做全局聚合。當你用到 ReducerAggregator或Aggregator時,Stream首先被重定向到一個分區中,然后其中的聚合函數便在這個分區上運行。當你用到CombinerAggregator時,Trident會首先在每個分區上做局部聚合,然后把局部聚合后的結果重定向到一個分區,因此使用CombinerAggregator會更高效,可能的話我們需要優先考慮使用它。

    四Grouped streams

      GroupBy操作是根據特定的字段對流進行重定向的,還有,在一個分區內部,每個相同字段的tuple也會被Group到一起如果你在grouped Stream上面運行aggregators,聚合操作會運行在每個Group中而不是整個Batch。persistentAggregate也能運行在GroupedSteam上,不過結果會被保存在MapState中,其中的key便是分組的字段。 當然,aggregators在GroupedStreams上也可以串聯。

    五Merge和Join

      api的最后一部分便是如何把各種流匯聚到一起。最簡單的方式就是把這些流匯聚成一個流。我們可以這么做:  

    topology.merge(stream1, stream2, stream3);

    ?

      另一種合并流的方式就是join。一個標準的join就像是一個sql,必須有標準的輸入,因此,join只針對符合條件的Stream。join應用在來自Spout的每一個小Batch中。join時候的tuple會包含:
      1.join的字段,如Stream1中的key和Stream2中的x

      2.所有非join的字段,根據傳入join方法的順序,a和b分別代表steam1的val1和val2,c代表Stream2的val1

      當join的是來源于不同Spout的stream時,這些Spout在發射數據時需要同步,一個Batch所包含的tuple會來自各個Spout。

    ?

    ?

    ?

    ?

    轉載于:https://www.cnblogs.com/senlinyang/p/8081447.html

    總結

    以上是生活随笔為你收集整理的Storm Trident API的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。