日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

有赞客户行为收集与实时处理系统设计

發布時間:2025/4/5 windows 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 有赞客户行为收集与实时处理系统设计 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

https://tech.youzan.com/realtime_customer_data_collection/

背景

有贊會員系統主要承載著有贊的客戶經營領域,致力于給商家提供全渠道客戶經營的能力。隨著社交網絡的普及,其社會化、多元化和創新化特質讓商家與消費者之間的聯系方式更加豐富,互動更加頻繁,相應的運營需求也大大增加。除了傳統的會員經營手段之外,會員系統需要提供能力,來幫助商家定義客戶的生命周期,構建精準的消費畫像:商家可以由此全面、及時地了解客戶的喜好、行為軌跡、消費能力等屬性,定義進而進行差異化的客戶經營。我們需要構建一套實時的客戶行為收集處理系統,來滿足上述業務需求。本文就簡單聊聊客戶行為收集系統的設計。

行為模型

我們把客戶行為事件定義為客戶與業務系統間的交互,客戶行為事件模型則描述了客戶在業務系統中的軌跡。它記錄了某個業務場景下一類或多類的客戶行為事件,并能夠反映事件的先后順序。通過對客戶事件的研究,我們可以評估客戶事件的發生以及它對企業價值的影響程度,預測相關事件的發生;或通過追蹤客戶行為或業務過程,研究與事件發生關聯的所有因素,來挖掘用戶行為事件背后的原因、交互影響等。

對于客戶行為事件,除客戶本身的標識外,我們還要定義關注的事件的業務屬性(如點擊商品事件中的商品信息、下單下單商品及交易屬性等),以及事件窗口的長度。其實體關系大致描述如下:

邏輯架構

邏輯架構共分為三層:

  • 客戶端 (Client)

客戶端主要包括兩類角色:一是客戶行為事件的產生源,另一類是客戶行為的輸出方。

  • 收集器 (Collector)

收集器的主要職責是從客戶端獲取客戶事件,并按照行為模型的定義轉換數據格式。考慮到客戶端的差異,收集器需要支持“推”和“拉”兩種模式

  • 推模式:由收集器提供收集接口,由客戶端調用;或客戶端嵌入SDK的方式,將行為事件推送給收集器
  • 拉模式:由收集器通過定時任務或消息隊列,從業務方系統獲取客戶事件

獲取到客戶事件之后,收集器根據預設或自定義的路由規則,將事件發布到分布式隊列服務中。在有贊我們使用的是?NSQ?(關于NSQ,可以移步重塑NSQ之路?系列了解更多詳情)。

隊列的消費端是流式計算引擎,通過引擎處理,最終將事件數據發送給存儲層。

  • 存儲層 (Storage)

存儲層會根據業務需要選擇MySQL或者HBase來事件數據的持久化。目前我們使用的是HBase,主要考慮因素是:1. HBase具有相對靈活的Schema 2. 與Hadoop集群的集成的便捷,使得事件數據不僅僅能夠支持實時處理,也能輕松地用于離線分析。

  • 服務層 (Service)

服務層用以支撐客戶端/外部系統對于客戶行為的查詢,目前只支持以隨機讀。

數據流

整個客戶行為收集系統主要有兩個方向的數據流:

  • 處理流

處理流將來自各個系統(包括業務系統、H5頁/App等)的客戶行為,按照行為模型定義的消息格式,發布到分布式隊列服務 (NSQ) ,由流式計算框架 (Storm) 對消息進行消費,并持久化到存儲層 (HBase或MySQL) 中。

  • 輸出流

輸出流的邏輯相對簡單,將存儲的客戶行為從存儲層讀取出來,通過查詢服務提供給使用方。目前我們的使用場景只涉及到隨機讀取。另外如果需要在離線分析(如 Hive)中使用的話,則可以通過Hive的 External Table 集成。

系統設計要求

實時性

作為客戶行為收集系統,實時性越高,對于商家就能夠更為及時地挖掘客戶特征、進行實時推薦或發現一些突發的狀況。因此,實時性是該系統的重要非功能性指標之一。在構建實時系統時,我們常常需要解決如下問題:

  • 突發的流量
  • 部分組件故障導致大量消息需要重試
  • 數據積壓
  • 業務邏輯的bug需要進行數據的重新處理

我們引入了Storm作為支撐整個客戶行為系統實時性的組件。Storm作為最早的開源分布式實時計算框架,被行業廣泛地應用于生產環境。它能夠支持到消息粒度的控制與處理,具有很好的容錯性、擴展性;從模型上來說,Storm的Continueous Streaming模型相對于Micro Batch模型能夠滿足更嚴格的時延要求(當然相對更低的時延的帶來了更高的開銷,在吞吐方面的表現較Micro Batch模型遜色)。

Storm的向外擴展 (Scale Out) 能力強大,能夠通過調節worker數量并重啟Topology (拓撲,Storm的計算任務)來完成計算能力的擴展。

當有基礎組件發生故障時,Storm Topology 的對應部分(Spout/Bolt)無法在 TOPOLOGY.MESSAGE.TIMEOUT內處理完消息,會觸發Storm的重試;如果短時間內重試消息過多,勢必會影響新生產的消息的消費,從而造成數據的延遲。因此,我們使用兩組Topic來應對這種場景。

如上圖所示,正常的生產客戶行為事件消息由生產者發布到?Normal Topic,由對應的Storm Spout來消費。當Topology的部分業務異常時,會觸發到Spout的失敗處理,此時Spout將消息publish到?Retry Topic?,由重試的Spout按照一定的策略進行重試;或者在超過一定重試次數后,Ack此消息,并發布一個延時的離線補償任務進行該業務單元的全量計算。下圖展示了一種重試的策略。

對于需要重新處理數據的場景,我們可以使用分布式隊列服務的消息重放功能,由Spout進行重新消費。

消息抵達保證

Storm 本身支持多種消息抵達保證的語義:通過 Acker 和 Reliability API 來實現 At Least Once 語義;通過?Trident?來實現 Exactly Once 語義。對于消息抵達,我們有如下幾個要求:?
1. 業務對于實時數據的要求是保證消息不丟失?
2. 部分業務場景需要我們支持?Exactly Once?語義。?
此外,作為Storm的Source,NSQ本身會保證消息一定推給consumer —— 即在極小的機率下NSQ Server會重復推送消息給Consumer。綜合上述情況,結合前面提到的多種重試場景的支持,我們使用 At Least Once 語義,而由具體的處理邏輯來保證冪等性。

public class NSQSpout extends BaseRichSpout { public void nextTuple() { String messageId = UUID.randomUUID().toString(); try { ... // 消息處理邏輯 this.collector.emit("stream_id", new Values(message), messageId); } catch (Exception e) { logger.error("Emit message failed. ", e); } } @Override public void ack(Object msgId) { // Get origin NSQMessage and finish it try { this.consumer.finish(NSQMessage message); } catch(NSQException e) { logger.error("Failed to ack message. id: {}", msgId, e); } } @Override public void fail(Object msgId) { logger.info("msg failed: {}", msgId); // 按照重試策略處理 } } public class BizBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } @Override public void execute(Tuple tuple) { Object obj = tuple.getValues(); // Do your business ... this.collector.emit("another-stream-id", tuple, new Values(obj)); this.collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { // 聲明bolt的輸出 outputFieldsDeclarer.declareStream("log-stream", new Fields("content")); } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051

存儲

我們主要選擇HBase來存儲客戶的行為。在HBase中,是通過Rowkey, ColumnFamily+Qualifier及Timestamp來定位數據的。Rowkey作為唯一標識,在設計業務表Schema時主要需要考慮以下幾點:

  • 長度:盡可能短,HBase的持久化文件HFile是按照Key-Value存儲的,如果Rowkey過長,會影響HFile的存儲效率。
  • 散列:針對隨機讀取的場景,需要散列Rowkey來避免查詢熱點集中到一個RegionServer上。我們采用了兩種方式:

  • 隨機化(如MD5)
  • 當Rowkey中需要保存遞增的序列(如:時間戳),同時要求Rowkey可讀時??蓪⑵渌腎D(如:客戶ID)截取后N位+遞增序列拼接成Rowkey 。一種參考的實現方式如下。
// 方式一 byte[] rowkey = MessageDigest.getInstance("MD5").digest(identifier.getBytes()); // 方式二 byte[] rowkey = String.format("%08d%d", prefix, timestamp).getBytes(); 1234
  • 唯一性:在該業務上必須是唯一的。

部署

Storm Topology的部署相當容易,只需要上傳新的JAR包即可。NSQ Server保存了Consumer當前的Offset,只要我們通過相同的Consumer Name (Channel) 重連NSQ Server,即可獲得之前消費的Offset。當然,如果需要強制重新消費,調整對應Channel的Offset即可。

總結

實時系統能夠有效彌補離線"T+1"的短板,同時也有更為嚴格的時效性和容錯要求,其實時性、可用性、可擴展性各個方面值得去仔細推敲和打磨。支撐更多的業務場景,改善數據收集效率是我們持續改進的動力,歡迎有興趣的同學勾搭。?liyumeng@youzan.com

參考資料

  • https://storm.apache.org/releases/1.0.2/Guaranteeing-message-processing.html
  • https://community.hortonworks.com/articles/550/unofficial-storm-and-kafka-best-practices-guide.html
  • https://hbase.apache.org/0.94/book/rowkey.design.html
  • https://storm.apache.org/releases/1.0.2/Acking-framework-implementation.html
  • 轉載于:https://www.cnblogs.com/davidwang456/articles/9239009.html

    總結

    以上是生活随笔為你收集整理的有赞客户行为收集与实时处理系统设计的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。