NiFi 常用处理器(Processor)介绍
生活随笔
收集整理的這篇文章主要介紹了
NiFi 常用处理器(Processor)介绍
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
常用處理器(Processor)介紹
處理器的種類
為了創建有效的數據流處理流程, 用戶必須了解可用的處理器類型。NiFi 包含許多不同的處理器, 這些處理器提供了可從眾多不同系統中提取數據, 路由, 轉換, 處理, 拆分和聚合數據以及將數據分發到多個系統的功能。
下面將重點介紹一些最常用的處理器, 按功能對它們進行分類。
數據轉換
- CompressContent:壓縮或解壓
- ConvertCharacterSet:將用于編碼內容的字符集從一個字符集轉換為另一個字符集
- EncryptContent:加密或解密
- ReplaceText:使用正則表達式修改文本內容
- TransformXml:應用XSLT轉換XML內容
- JoltTransformJSON:應用JOLT規范來轉換JSON內容
路由和調解
- ControlRate:限制流程中數據流經某部分的速率
- DetectDuplicate:根據一些用戶定義的標準去監視發現重復的FlowFiles。通常與HashContent一起使用
- DistributeLoad:通過只將一部分數據分發給每個用戶定義的關系來實現負載平衡或數據抽樣
- MonitorActivity:當用戶定義的時間段過去而沒有任何數據流經此節點時發送通知。(可選)在數據流恢復時發送通知。
- RouteOnAttribute:根據FlowFile包含的屬性路由FlowFile。
- ScanAttribute:掃描FlowFile上用戶定義的屬性集,檢查是否有任何屬性與用戶定義的字典匹配。
- RouteOnContent:根據FlowFile的內容是否與用戶自定義的正則表達式匹配。如果匹配,則FlowFile將路由到已配置的關系。
- ScanContent:在流文件的內容中搜索用戶定義字典中存在的術語,并根據這些術語的存在或不存在來路由。字典可以由文本條目或二進制條目組成。
- ValidateXml:以XML模式驗證XML內容; 根據用戶定義的XML Schema,判斷FlowFile的內容是否有效,進而來路由FlowFile。
數據庫訪問
- ConvertJSONToSQL:將JSON文檔轉換為SQL INSERT或UPDATE命令,然后可以將其傳遞給PutSQL Processor
- ExecuteSQL:執行用戶定義的SQL SELECT命令,結果為Avro格式的FlowFile
- PutSQL:通過執行FlowFile內容定義的SQL DDM語句來更新數據庫
- SelectHiveQL:對Apache Hive數據庫執行用戶定義的HiveQL SELECT命令,結果為Avro或CSV格式的FlowFile
- PutHiveQL:通過執行FlowFile內容定義的HiveQL DDM語句來更新Hive數據庫
屬性提取
- EvaluateJsonPath:用戶提供JSONPath表達式(類似于XPath,用于XML解析/提取), 然后根據JSON內容評估這些表達式,用結果值替換FlowFile內容或將結果值提取到用戶自己命名的Attribute中。
- EvaluateXPath:用戶提供XPath表達式,然后根據XML內容評估這些表達式, 用結果值替換FlowFile內容或將結果值提取到用戶自己命名的Attribute中。
- EvaluateXQuery:用戶提供XQuery查詢,然后根據XML內容評估此查詢, 用結果值替換FlowFile內容或將結果值提取到用戶自己命名的Attribute中。
- ExtractText:用戶提供一個或多個正則表達式,然后根據FlowFile的文本內容對其進行評估, 然后將結果值提取到用戶自己命名的Attribute中。
- HashAttribute:對用戶定義的現有屬性列表的串聯進行hash。
- HashContent:對FlowFile的內容進行hash, 并將得到的hash值添加到Attribute中。
- IdentifyMimeType:評估FlowFile的內容, 以確定FlowFile封裝的文件類型。此處理器能夠檢測許多不同的MIME類型, 例如圖像, 文字處理器文檔, 文本和壓縮格式, 僅舉幾例。
- UpdateAttribute:向FlowFile添加或更新任意數量的用戶定義的屬性。這對于添加靜態的屬性值以及使用表達式語言動態計算出來的屬性值非常有用。該處理器還提供"高級用戶界面(Advanced User Interface)",允許用戶根據用戶提供的規則有條件地去更新屬性。
系統交互
- ExecuteProcess:運行用戶自定義的操作系統命令。進程的StdOut被重定向,以便StdOut的內容輸出為FlowFile的內容。此處理器是源處理器(不接受數據流輸入,沒有上游組件) - 其輸出預計會生成新的FlowFile,并且系統調用不會接收任何輸入。如果要為進程提供輸入,請使用ExecuteStreamCommand Processor。
- ExecuteStreamCommand:運行用戶定義的操作系統命令。FlowFile的內容可選地流式傳輸到進程的StdIn。StdOut的內容輸出為FlowFile的內容。此處理器不能用作源處理器 - 必須傳入FlowFiles才能執行。
數據提取
- GetFile:將文件內容從本地磁盤(或網絡連接的磁盤)流式傳輸到NiFi,然后刪除原始文件。此處理器應將文件從一個位置移動到另一個位置,而不是用于復制數據。
- GetFTP:通過FTP將遠程文件的內容下載到NiFi中,然后刪除原始文件。此處理器應將文件從一個位置移動到另一個位置,而不是用于復制數據。
- GetSFTP:通過SFTP將遠程文件的內容下載到NiFi中,然后刪除原始文件。此處理器應將文件從一個位置移動到另一個位置,而不是用于復制數據。
- GetJMSQueue:從JMS隊列下載消息,并根據JMS消息的內容創建FlowFile。可選地,JMS屬性也可以作為屬性復制。
- GetJMSTopic:從JMS主題下載消息,并根據JMS消息的內容創建FlowFile。可選地,JMS屬性也可以作為屬性復制。此處理器支持持久訂閱和非持久訂閱。
- GetHTTP:將基于HTTP或HTTPS的遠程URL的請求內容下載到NiFi中。處理器將記住ETag和Last-Modified Date,以確保不會持續攝取數據。
- ListenHTTP:啟動HTTP(或HTTPS)服務器并偵聽傳入連接。對于任何傳入的POST請求,請求的內容將作為FlowFile寫出,并返回200響應。
- ListenUDP:偵聽傳入的UDP數據包,并為每個數據包或每個數據包創建一個FlowFile(取決于配置),并將FlowFile發送到"success"。
- GetHDFS:監視HDFS中用戶指定的目錄。每當新文件進入HDFS時,它將被復制到NiFi并從HDFS中刪除。此處理器應將文件從一個位置移動到另一個位置,而不是用于復制數據。如果在集群中運行,此處理器需僅在主節點上運行。要從HDFS復制數據并使其保持原狀,或者從群集中的多個節點流式傳輸數據,請參閱ListHDFS處理器。
- ListHDFS / FetchHDFS:ListHDFS監視HDFS中用戶指定的目錄,并發出一個FlowFile,其中包含它遇到的每個文件的文件名。然后,它通過分布式緩存在整個NiFi集群中保持此狀態。然后可以在集群中,將其發送到FetchHDFS處理器,后者獲取這些文件的實際內容并發出包含從HDFS獲取的內容的FlowFiles。
- GetKafka:從Apache Kafka獲取消息,特別是0.8.x版本。消息可以作為每個消息的FlowFile發出,也可以使用用戶指定的分隔符一起進行批處理。
- GetMongo:對MongoDB執行用戶指定的查詢,并將內容寫入新的FlowFile。
數據出口/發送數據
- PutEmail:向配置的收件人發送電子郵件。FlowFile的內容可選擇作為附件發送。
- PutFile:將FlowFile的內容寫入本地(或網絡連接)文件系統上的目錄。
- PutFTP:將FlowFile的內容復制到遠程FTP服務器。
- PutSFTP:將FlowFile的內容復制到遠程SFTP服務器。
- PutJMS:將FlowFile的內容作為JMS消息發送到JMS代理,可選擇將Attributes添加JMS屬性。
- PutSQL:將FlowFile的內容作為SQL DDL語句(INSERT,UPDATE或DELETE)執行。FlowFile的內容必須是有效的SQL語句。屬性可以用作參數,FlowFile的內容可以是參數化的SQL語句,以避免SQL注入攻擊。
- PutKafka:將FlowFile的內容作為消息發送到Apache Kafka,特別是0.8.x版本。FlowFile可以作為單個消息或分隔符發送,例如可以指定換行符,以便為單個FlowFile發送許多消息。
- PutMongo:將FlowFile的內容作為INSERT或UPDATE發送到Mongo。
分裂和聚合
- SplitText:SplitText接收單個FlowFile,其內容為文本,并根據配置的行數將其拆分為1個或多個FlowFiles。例如,可以將處理器配置為將FlowFile拆分為多個FlowFile,每個FlowFile只有一行。
- SplitJson:允許用戶將包含數組或許多子對象的JSON對象拆分為每個JSON元素的FlowFile。
- SplitXml:允許用戶將XML消息拆分為多個FlowFiles,每個FlowFiles包含原始段。這通常在多個XML元素與"wrapper"元素連接在一起時使用。然后,此處理器允許將這些元素拆分為單獨的XML元素。
- UnpackContent:解壓縮不同類型的存檔格式,例如ZIP和TAR。然后,歸檔中的每個文件都作為單個FlowFile傳輸。
- SegmentContent:根據某些已配置的數據大小將FlowFile劃分為可能的許多較小的FlowFile。不對任何類型的分界符執行拆分,而是僅基于字節偏移執行拆分。這是在傳輸FlowFiles之前使用的,以便通過并行發送許多不同的部分來提供更低的延遲。而另一方面,MergeContent處理器可以使用碎片整理模式重新組裝這些FlowFiles。
- MergeContent:此處理器負責將許多FlowFiles合并到一個FlowFile中。可以通過將其內容與可選的頁眉,頁腳和分界符連接在一起,或者通過指定存檔格式(如ZIP或TAR)來合并FlowFiles。FlowFiles可以根據公共屬性進行分箱(binned),或者如果這些流是被其他組件拆分的,則可以進行"碎片整理(defragmented)"。根據元素的數量或FlowFiles內容的總大小(每個bin的最小和最大大小是用戶指定的)并且還可以配置可選的Timeout屬性,即FlowFiles等待其bin變為配置的上限值最大時間。
- SplitContent:將單個FlowFile拆分為可能的許多FlowFile,類似于SegmentContent。但是,對于SplitContent,不會在任意字節邊界上執行拆分,而是指定要拆分內容的字節序列。
HTTP
- GetHTTP:將基于HTTP或HTTPS的遠程URL的內容下載到NiFi中。處理器將記住ETag和Last-Modified Date,以確保不會持續攝取數據。
- ListenHTTP:啟動HTTP(或HTTPS)服務器并偵聽傳入連接。對于任何傳入的POST請求,請求的內容將作為FlowFile寫出,并返回200響應。
- InvokeHTTP:執行用戶配置的HTTP請求。此處理器比GetHTTP和PostHTTP更通用,但需要更多配置。此處理器不能用作源處理器,并且需要具有傳入的FlowFiles才能被觸發以執行其任務。
- PostHTTP:執行HTTP POST請求,將FlowFile的內容作為消息正文發送。這通常與ListenHTTP結合使用,以便在無法使用s2s的情況下在兩個不同的NiFi實例之間傳輸數據(例如,節點無法直接訪問并且能夠通過HTTP進行通信時代理)。 注意:除了現有的RAW套接字傳輸之外,HTTP還可用作s2s傳輸協議。它還支持HTTP代理。建議使用HTTP s2s,因為它更具可擴展性,并且可以使用具有更好用戶身份驗證和授權的輸入/輸出端口的方式來提供雙向數據傳輸。
- HandleHttpRequest / HandleHttpResponse:HandleHttpRequest Processor是一個源處理器,與ListenHTTP類似,啟動嵌入式HTTP(S)服務器。但是,它不會向客戶端發送響應(比如200響應)。相反,流文件是以HTTP請求的主體作為其內容發送的,所有典型servlet參數、頭文件等的屬性作為屬性。然后,HandleHttpResponse能夠在FlowFile完成處理后將響應發送回客戶端。這些處理器總是彼此結合使用,并允許用戶在NiFi中可視化地創建Web服務。這對于將前端添加到非基于Web的協議或圍繞已經由NiFi執行的某些功能(例如數據格式轉換)添加簡單的Web服務特別有用。
總結
以上是生活随笔為你收集整理的NiFi 常用处理器(Processor)介绍的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql 事件跟踪_ORACLE 事件
- 下一篇: send返回值