日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

伴鱼:借助 Flink 完成机器学习特征系统的升级

發布時間:2024/8/23 windows 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 伴鱼:借助 Flink 完成机器学习特征系统的升级 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

簡介:?Flink 用于機器學習特征工程,解決了特征上線難的問題;以及 SQL + Python UDF 如何用于生產實踐。

本文作者陳易生,介紹了伴魚平臺機器學習特征系統的升級,在架構上,從 Spark 轉為 Flink,解決了特征上線難的問題,以及 SQL + Python UDF 如何用于生產實踐。 主要內容為:

  • 前言
  • 老版特征系統 V1
  • 新版特征系統 V2
  • 總結
  • 一、前言

    在伴魚,我們在多個在線場景使用機器學習提高用戶的使用體驗,例如:在伴魚繪本中,我們根據用戶的帖子瀏覽記錄,為用戶推薦他們感興趣的帖子;在轉化后臺里,我們根據用戶的繪本購買記錄,為用戶推薦他們可能感興趣的課程等。

    特征是機器學習模型的輸入。如何高效地將特征從數據源加工出來,讓它能夠被在線服務高效地訪問,決定了我們能否在生產環境可靠地使用機器學習。為此,我們搭建了特征系統,系統性地解決這一問題。目前,伴魚的機器學習特征系統運行了接近 100 個特征,支持了多個業務線的模型對在線獲取特征的需求。

    下面,我們將介紹特征系統在伴魚的演進過程,以及其中的權衡考量。

    二、舊版特征系統 V1

    特征系統 V1 由三個核心組件構成:特征管道,特征倉庫,和特征服務。整體架構如下圖所示:

    特征管道包括流特征管道批特征管道,它們分別消費流數據源和批數據源,對數據經過預處理加工成特征 (這一步稱為特征工程),并將特征寫入特征倉庫。

    • 批特征管道使用 Spark 實現,由 DolphinScheduler 進行調度,跑在 YARN 集群上;
    • 出于技術棧的一致考慮,流特征管道使用 Spark Structured Streaming 實現,和批特征管道一樣跑在 YARN 集群上。

    特征倉庫選用合適的存儲組件 (Redis) 和數據結構 (Hashes),為模型服務提供低延遲的特征訪問能力。之所以選用 Redis 作為存儲,是因為:

    • 伴魚有豐富的 Redis 使用經驗;
    • 包括?DoorDash Feature Store和?Feast在內的業界特征倉庫解決方案都使用了 Redis。

    特征服務屏蔽特征倉庫的存儲和數據結構,對外暴露 RPC 接口?GetFeatures(EntityName, FeatureNames),提供對特征的低延遲點查詢。在實現上,這一接口基本對應于 Redis 的?HMGET EntityName FeatureName_1 ... FeatureName_N?操作。

    這一版本的特征系統存在幾個問題:

    • 算法工程師缺少控制,導致迭代效率低。這個問題與系統涉及的技術棧和公司的組織架構有關。在整個系統中,特征管道的迭代需求最高,一旦模型對特征有新的需求,就需要修改或者編寫一個新的 Spark 任務。而 Spark 任務的編寫需要有一定的 Java 或 Scala 知識,不屬于算法工程師的常見技能,因此交由大數據團隊全權負責。大數據團隊同時負責多項數據需求,往往有很多排期任務。結果便是新特征的上線涉及頻繁的跨部門溝通,迭代效率低;
    • 特征管道只完成了輕量的特征工程,降低在線推理的效率。由于特征管道由大數據工程師而非算法工程師編寫,復雜的數據預處理涉及更高的溝通成本,因此這些特征的預處理程度都比較輕量,更多的預處理被留到模型服務甚至模型內部進行,增大了模型推理的時延。

    為了解決這幾個問題,特征系統 V2 提出幾個設計目的:

    • 將控制權交還算法工程師,提高迭代效率;
    • 將更高權重的特征工程交給特征管道,提高在線推理的效率。

    三、新版特征系統 V2

    特征系統 V2 相比特征系統 V1 在架構上的唯一不同點在于,它將特征管道切分為三部分:特征生成管道,特征源,和特征注入管道。值得一提的是,管道在實現上均從 Spark 轉為 Flink,和公司數據基礎架構的發展保持一致。特征系統 V2 的整體架構如下圖所示:

    1. 特征生成管道

    特征生成管道讀取原始數據源,加工為特征,并將特征寫入指定特征源 (而非特征倉庫)。

    • 如果管道以流數據源作為原始數據源,則它是流特征生成管道;
    • 如果管道以批數據源作為原始數據源,則它是批特征生成管道。

    特征生成管道的邏輯由算法工程師全權負責編寫。其中,批特征生成管道使用 HiveQL 編寫,由 DolphinScheduler 調度。流特征生成管道使用 PyFlink 實現,詳情見下圖:

    算法工程師需要遵守下面步驟:

  • 用 Flink SQL 聲明 Flink 任務源 (source.sql) 和定義特征工程邏輯 (transform.sql);
  • (可選) 用 Python 實現特征工程邏輯中可能包含的 UDF 實現 (udf_def.py);
  • 使用自研的代碼生成工具,生成可執行的 PyFlink 任務腳本 (run.py);
  • 本地使用由平臺準備好的 Docker 環境調試 PyFlink 腳本,確保能在本地正常運行;
  • 把代碼提交到一個統一管理特征管道的代碼倉庫,由 AI 平臺團隊進行代碼審核。審核通過的腳本會被部署到伴魚實時計算平臺,完成特征生成管道的上線。
  • 這一套流程確保了:

    • 算法工程師掌握上線特征的自主權;
    • 平臺工程師把控特征生成管道的代碼質量,并在必要時可以對它們實現重構,而無需算法工程師的介入。

    2. 特征源

    特征源存儲從原始數據源加工形成的特征。值得強調的是,它同時還是連接算法工程師和 AI 平臺工程師的橋梁。算法工程師只負責實現特征工程的邏輯,將原始數據加工為特征,寫入特征源,剩下的事情就交給 AI 平臺。平臺工程師實現特征注入管道,將特征寫入特征倉庫,以特征服務的形式對外提供數據訪問服務。

    3. 特征注入管道

    特征注入管道將特征從特征源讀出,寫入特征倉庫。由于 Flink 社區缺少對 Redis sink 的原生支持,我們通過拓展?RichSinkFunction簡單地實現了?StreamRedisSink?和?BatchRedisSink,很好地滿足我們的需求。

    其中,BatchRedisSink?通過?Flink Operator State?和?Redis Pipelining的簡單結合,大量參考 Flink 文檔中的?BufferingSink,實現了批量寫入,大幅減少對 Redis Server 的請求量,增大吞吐,寫入效率相比逐條插入提升了 7 倍?。BatchRedisSink?的簡要實現如下。其中,flush?實現了批量寫入 Redis 的核心邏輯,checkpointedState?/?bufferedElements?/?snapshotState?/?initializeState?實現了使用 Flink 有狀態算子管理元素緩存的邏輯。

    class BatchRedisSink(pipelineBatchSize: Int ) extends RichSinkFunction[(String, Timestamp, Map[String, String])]with CheckpointedFunction {@transientprivate var checkpointedState: ListState[(String, java.util.Map[String, String])] = _private val bufferedElements: ListBuffer[(String, java.util.Map[String, String])] =ListBuffer.empty[(String, java.util.Map[String, String])]private var jedisPool: JedisPool = _override def invoke(value: (String, Timestamp, Map[String, String]),context: SinkFunction.Context): Unit = {import scala.collection.JavaConverters._val (key, _, featureKVs) = valuebufferedElements += (key -> featureKVs.asJava)if (bufferedElements.size == pipelineBatchSize) {flush()}}private def flush(): Unit = {var jedis: Jedis = nulltry {jedis = jedisPool.getResourceval pipeline = jedis.pipelined()for ((key, hash) <- bufferedElements) {pipeline.hmset(key, hash)}pipeline.sync()} catch { ... } finally { ... }bufferedElements.clear()}override def snapshotState(context: FunctionSnapshotContext): Unit = {checkpointedState.clear()for (element <- bufferedElements) {checkpointedState.add(element)}}override def initializeState(context: FunctionInitializationContext): Unit = {val descriptor =new ListStateDescriptor[(String, java.util.Map[String, String])]("buffered-elements",TypeInformation.of(new TypeHint[(String, java.util.Map[String, String])]() {}))checkpointedState = context.getOperatorStateStore.getListState(descriptor)import scala.collection.JavaConverters._if (context.isRestored) {for (element <- checkpointedState.get().asScala) {bufferedElements += element}}}override def open(parameters: Configuration): Unit = {try {jedisPool = new JedisPool(...)} catch { ... }}override def close(): Unit = {flush()if (jedisPool != null) {jedisPool.close()}} }

    特征系統 V2 很好地滿足了我們提出的設計目的。

    • 由于特征生成管道的編寫只需用到 SQL 和 Python 這兩種算法工程師十分熟悉的工具,因此他們全權負責特征生成管道的編寫和上線,無需依賴大數據團隊,大幅提高了迭代效率。在熟悉后,算法工程師通常只需花費半個小時以內,就可以完成流特征的編寫、調試和上線。而這個過程原本需要花費數天,取決于大數據團隊的排期;
    • 出于同樣的原因,算法工程師可以在有需要的前提下,完成更重度的特征工程,從而減少模型服務和模型的負擔,提高模型在線推理效率。

    四、總結

    特征系統 V1 解決了特征上線的問題,而特征系統 V2 在此基礎上,解決了特征上線難的問題。在特征系統的演進過程中,我們總結出作為平臺研發的幾點經驗:

    • 平臺應該提供用戶想用的工具。這與 Uber ML 平臺團隊在內部推廣的經驗相符。算法工程師在 Python 和 SQL 環境下工作效率最高,而不熟悉 Java 和 Scala。那么,想讓算法工程師自主編寫特征管道,平臺應該支持算法工程師使用 Python 和 SQL 編寫特征管道,而不是讓算法工程師去學 Java 和 Scala,或是把工作轉手給大數據團隊去做;
    • 平臺應該提供易用的本地調試工具。我們提供的 Docker 環境封裝了 Kafka 和 Flink,讓用戶可以在本地快速調試 PyFlink 腳本,而無需等待管道部署到測試環境后再調試;
    • 平臺應該在鼓勵用戶自主使用的同時,通過自動化檢查或代碼審核等方式牢牢把控質量。

    原文鏈接
    本文為阿里云原創內容,未經允許不得轉載。?

    總結

    以上是生活随笔為你收集整理的伴鱼:借助 Flink 完成机器学习特征系统的升级的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。