日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

trident API指南

發布時間:2024/1/23 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 trident API指南 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

trident API指南

@(STORM)[storm]

  • trident API指南
  • 零 概述
    • 1 本地分區操作
    • 2 重新分區操作
    • 3 聚合操作
    • 4 流分組操作
    • 5合并與連接
  • 一 本地分區操作
    • 一 函數
    • 二 filter
    • 三 分區聚合
      • 1Aggregator接口
      • 2init方法
      • 3aggregate方法
      • 4complete方法
    • 四態查詢與分區持久化
    • 五投影
  • 二重新分區操作
  • 三 聚合操作
  • 四 流分組操作
  • 五 合并與連接
  • 六 一些注意事項
    • 一歸納及思考
    • 二partitionBy與groupBy
    • 三partitionAggregate與aggregate
    • 四partitionPersistence與persistentAggregate
    • 五分組與聚合

官方文檔請參考:https://storm.apache.org/documentation/Trident-API-Overview.html

零、 概述

Trident的核心數據模型是一系統批處理的流,流被分發到集群的節點上,對流的操作也并行在各個分區中進行。
在Trident中,分區的概念大致與task對應,并行在各個分區中進行,也就是并行在各個task中進行。
對比于store core的統一bolt,Trident提供了各種各樣的函數操作,方便用戶直接調用,主要有以下5類:

1、 本地分區操作

(1)應用在本地的每個分區,不需要網絡傳輸
(2)包括Function,filter, partitionAggregate, stateQuery/partitionPersist, projection
(3)以filter為例,它會對這個分區(task)中的數據進行過濾,再發送出去,不會涉及其它分區的內容。

2、 重新分區操作

(1)重新分區一個流,但不改變其內容,需要網絡傳輸,即在不同的task間交換數據
(2)包括shuffle, broadcast, partitionBy, global, batchGlobal,partition等
(3)以partitionBy為例,它會將各個分區中的數據根據字段值分區到各個目標分區中,保證相同字段值在同一個分區中,但消息中的內容是沒有被改變的,只是重新作了分區。

3、 聚合操作

(1)對數據流進行聚合,會有網絡傳輸,輸出結果是其聚合操作的結果
(2)包括aggregate、persistenAggregate。
(3)一般需要一個聚合函數作為參數,指定如何進行聚合,如trident自身提供的Sum, Count等類,它們都實現了Aggregator接口。

4、 流分組操作

(1)對數據流進行分區并分組,即根據字段的值先分到不同的分區,然后在分區內再根據字段的值進行分組。
(2)主要包括groupBy。
(3)如果在一個流分組中運行聚合器,聚會會在每個組內運行。persistenAggregate也可以運行在一個GroupStream中,在這種情況下,結果將保存在一個按關鍵字段進行分組的MapState中。

5、合并與連接

(1)合并或者連接幾個數據流
(2)包括merger, join等

一、 本地分區操作

(一) 函數

輸出元組的字段附加到原輸入元組字段后面,一般通過繼承BaseFuntion來實現。

(二) filter

把一個元組作為輸出,調用filter的isKeep方法,判斷是否保留這個元組。

(三) 分區聚合

partitionAggregate的具體使用可以參考trident State應用指南 http://blog.csdn.net/lujinhong2/article/details/49909945

簡單的說就是partitionAggregate在分區內調用一個實現了一個Aggregator接口的類,實現聚合操作,如Sum等。
關于partitionAggregate與aggregate請參考后面內容

內容如下:
這里涉及了一些trident常用的API,但project等相對容易理解,這里只介紹partitionAggregate的用法。

再看看上面代碼中對partitionAggregate的使用:

Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue"))

第一,三個參數分別表示輸入流的名稱與輸出流的名稱。中間的NameCountAggregator是一個Aggregator的對象,它定義了如何對輸入流進行聚合。我們看一下它的代碼:

public class NameCountAggregator implements Aggregator<Map<String, Integer>> {private static final long serialVersionUID = -5141558506999420908L;@Overridepublic Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>();}//判斷某個名字是否已經存在于map中,若無,則put,若有,則遞增@Overridepublic void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);}}//將聚合后的結果emit出去@Overridepublic void complete(Map<String, Integer> map,TridentCollector collector) {if (map.size() > 0) {for(Entry<String, Integer> entry : map.entrySet()){System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue());collector.emit(new Values(entry.getKey(),entry.getValue()));}map.clear();} }@Overridepublic void prepare(Map conf, TridentOperationContext context) {}@Overridepublic void cleanup() {}}

(1)Aggregator接口

它實現了Aggregator接口,這個接口有3個方法:

public interface Aggregator<T> extends Operation {T init(Object batchId, TridentCollector collector);void aggregate(T val, TridentTuple tuple, TridentCollector collector);void complete(T val, TridentCollector collector); }

init方法:在處理batch之前被調用。init的返回值是一個表示聚合狀態的對象,該對象會被傳遞到aggregate和complete方法。
aggregate方法:為每個在batch分區的輸入元組所調用,更新狀態
complete方法:當batch分區的所有元組已經被aggregate方法處理完后被調用。

除了實現Aggregator接口,還可以實現ReducerAggregator或者CombinerAggregator,它們使用更方便。詳見《從零開始學storm》或者官方文檔
https://storm.apache.org/documentation/Trident-API-Overview.html

下面我們看一下這3個方法的實現。

(2)init方法

@Override public Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>(); }

僅初始化了一個HashMap對象,這個對象會作為參數傳給aggregate和complete方法。對一個batch只執行一次。

(3)aggregate方法

aggregate方法對于batch內的每一個tuple均執行一次。這里將這個batch內的名字出現的次數放到init方法所初始化的map中。

@Override public void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);} }

(4)complete方法

這里在complete將aggregate處理完的結果發送出去,實際上可以在任何地方emit,比如在aggregate里面。
這個方法對于一個batch也只執行一次。

@Override public void complete(Map<String, Integer> map,TridentCollector collector) {if (map.size() > 0) {for(Entry<String, Integer> entry : map.entrySet()){System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue());collector.emit(new Values(entry.getKey(),entry.getValue()));}map.clear();} }

(四)態查詢與分區持久化

即stateQuery與partitionPersist,分別用于查詢與更新狀態源,注意均只對分區內的數據操作

(五)投影

即project操作,如果對字段[“a”,”b”,”c”]進行以下操作:

myStream.project(new Field(“b”)),則輸出流只包含字段b。

二、重新分區操作

重新分區操作運行一個函數改變元組在任務之間的分布,也可以調整分區的數量,它需要網絡傳輸。包括以下方法:
shuffle: Use random round robin algorithm to evenly redistribute tuples across all target partitions
broadcast: Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do a stateQuery on every partition of data.
partitionBy: partitionBy takes in a set of fields and does semantic partitioning based on that set of fields. The fields are hashed and modded by the number of target partitions to select the target partition. partitionBy guarantees that the same set of fields always goes to the same target partition.
global: All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.
batchGlobal: All tuples in the batch are sent to the same partition. Different batches in the stream may go to different partitions.
partition: This method takes in a custom partitioning function that implements backtype.storm.grouping.CustomStreamGrouping

三、 聚合操作

(1)對數據流進行聚合,會有網絡傳輸,輸出結果是其聚合操作的結果
(2)包括aggregate、persistenAggregate。
(3)一般需要一個聚合函數作為參數,指定如何進行聚合,如trident自身提供的Sum, Count等類,它們都實現了Aggregator接口。

四、 流分組操作

(1)對數據流進行分區并分組,即根據字段的值先分到不同的分區,然后在分區內再根據字段的值進行分組。
(2)主要包括groupBy。
(3)如果在一個流分組中運行聚合器,聚會會在每個組內運行。persistenAggregate也可以運行在一個GroupStream中,在這種情況下,結果將保存在一個按關鍵字段進行分組的MapState中。

五、 合并與連接

(1)合并或者連接幾個數據流
(2)包括merger, join等

六、 一些注意事項

(一)歸納及思考

trident每次處理一個batch(先不考慮setMaxSpoutPending的情況),這個batch會被分配到多個task進行處理(即多個分區),然后這些task完成會后繼續發送到下游的目標task,直至所有的task都完成操作,再進行下一個batch的處理。
在task與task間的傳輸就需要用到這里提供的各種函數。

(二)partitionBy與groupBy

partitionBy根據字段的值進行哈希,然后根據目標分區的數量求模,以確定相應的值放到哪個分區中。因此可以保證相同的字段值放在同一個分區。但由于分區數量有限,而不同字段值的數量會較多,因此不同的字段值也可能放在一個分區中
groupBy根據字段的值做先做partitionBy,保證相同字段的值都已經在同一個分區,但同樣不同的字段值也有可能在一個分區,然后,在一個分區內再進行分組,保證每一個組內的字段值都是相同的,看看官方的示意圖:

  • partitionBy: partitionBy takes in a set of fields and does semantic partitioning based on that set of fields. The fields are hashed and modded by the number of target partitions to select the target partition. partitionBy guarantees that the same set of fields always goes to the same target partition.

  • groupBy: The groupBy operation repartitions the stream by doing a partitionBy on the specified fields, and then within each partition groups tuples together whose group fields are equal.

(三)partitionAggregate()與aggregate()

注意,Stream類和Aggregator類均有aggregate()方法,這里指的是Stream類的aggregate()方法。同時簡單說一下二者的關系,Aggregator一般作為Stream類中aggregate()方法的參數,前者的aggregate方法用于對Stream中的每一個tuple進行處理。

partitionAggregate是對分區內的數據先進行一次聚合,而aggregate是對所有分區作聚合,舉個例子:

trident.newStream(“TRIDENT_SPOUT”, new MySpout()).partitionAggregate(new Sum(), new Fields(“out1”)).parallelismHint(10).aggregate(new Fields(“out1”), new Sum(), new Fields(“out2”))

上述例子中,Spout發送一些數據,然后partitionAggregate先在各個分區內計算sum,最后把結果發送出去,aggregate匯總各個分區的內容再作一次聚合(sum)得出最后的結果,因此一般而言,aggregate有多個task同時執行的意義不大,除非是允許多個批次同時執行的情況,可以每個aggregate處理一個批次。

* 注意二者均使用一個實現了Aggregator接口的類對象作為參數*,如:

public class Sum implements CombinerAggregator<Number>

(四)partitionPersistence()與persistentAggregate()

  • 一般的state使用partitionPersistence()
  • MapSate使用persistentAggregate()

均是用于將處理結果進行持久化的。

(五)分組與聚合

分組(groupBy)是將數據按照key分成一個一個的組
聚合(aggregate)是將數據根據某種運算規則(如Sum)聚在一起,計算一個結果。
按鍵聚合(aggregateByKey)是將數據在相同的key范圍內聚合在一起,比如同一個key的值相加。(spark的api)
persistenceAggregate接收的是一個GroupStream,經在已經分好組的組內進行聚合計算。

總結

以上是生活随笔為你收集整理的trident API指南的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。