李晋面试总结
集群規模
- 團隊規模 5個人 OLAP 1 數據采集1 報表 2 組長
- 數據量 3 千萬條 實時 5-6 百萬條
- 12核 24 線程 內存 64 G 硬盤 8 T * 8 個
- kafka 3臺 副本兩個 mysql 4臺 hdfs + yarn
- CDH 6.2
- hadoop 3.0.3
- hive 2.1
- spark 2.4
- linux版本CentOS-7 就是linux7
集群安裝步驟 : 免密 ip映射 集群同步
公司概況
自我介紹!!!
面試官您好 我叫李晉 畢業后一直從事大數據開發這一行業,離線數據分析和實時數據分析之前都有做過,包括數倉的搭建也都有參與過,之前工作離線主要是基于hive和spark來進行開發,實時使用的是flink進行的開發,另外平常的話喜歡逛逛博客呀,學習學習新技術,自己呢也會寫一點總結什么的以上就是我的一點個人介紹項目背景
根據大數據分析 實現精準的廣告投放 以便提高公司業務量 同時的話可以根據用戶的消費水平 消費行為 推薦給用戶對應的商品
基礎流量指標分析 (流量會話聚合表) (指標有 日/周/月 pv uv 總訪問時長等)
- 比如統計日活 月活這些指標 我們可以清晰的看出我們的產品目前的狀態 根據日活月活分析判斷我們的產品是否需要更進一步的推廣
- 假如當前日活月活穩定在一個正常值,但是增長緩慢,可以嘗試進一步推廣,擴大用戶規模
歸因事件分析
首次歸因 進行分析 同一個用戶購買一件商品歸因分析 先按時間將待歸因事件進行過濾 然后在取出第一個事件 得到的就是首次歸因 和業務人員以及前端人員一起商量
比如提交訂單這個事件
- 他可能有多種行為導致他提交訂單的操作
- 包括自己搜索呀, 點擊某個廣告位 某個運營位 點擊他人分享等 我們一般會和運營人員進行商討 選擇合適的歸因模型
- 比如選擇首次歸因 判斷用戶每次提交訂單的行為是從那個行為上最先感興趣的的
- 如果大部分都是某個廣告位點擊進行的 那我們就提高此廣告位的曝光率 以便提高訂單的轉換率
離線數據系統的整體架構
數倉分為 行為域 業務域
主要負責行為域域
項目是一個純離線的項目 , 整體上分三層 , 數據采集層 , 數據計算層 , 數據服務層
數據整體框架流向
離線
- 日志數據 使用 flume 采集至 hdfs 中(采集到的都是 json 格式的數據) flume 構建攔截器將臟數據直接過濾
- 基于hive搭建的一個離線的數倉 hive中分了四層 ods dwd dws ads
- 將hdfs中的數據 直接load到hive的 ods 也是就貼源層
- 經過一系列的數據清洗轉換集成等操作形成一張大寬表放置到 dwd 層
- 將數據分主題進行計算各類成品表 因為有一些的成品表可能很多數據都可以由一張中間表得出 我們就制作了中間表放置在dws層
- 最后ads層就是我們的成品報表
- 成品報表我們一般會導入至hbase中以供后續提供給運營部進行分析什么的 這也就是olap那塊了
- 其他的比如多維統計我們一開始使用的是hive的高階聚合函數 后來的話使用的是kylin
- 一些臨時性的指標查詢我們使用的是 presto 基于內存計算 對接多種數據源
- 第二個項目是對第一個項目進行了一些改進和擴展 改進主要是ods - dwd 數據處理那塊
- 匿名數據的標識我們之前采用的是直接使用設備ID作為匿名數據的標識
- 改進后使用動態綁定的方式 做了一張設備賬號評分關聯表 根據每個設備上賬號的登錄次數我們給與評分 將匿名用戶歸給評分高的用戶 當然了如果該設備沒有登錄過任何一個賬號 那就還使用設備ID
- 新增了用戶畫像板塊 主要就是提取一些基本標簽
實時
- 對離線數據采集部分進行了更改 使用flume將數據直接采集到kafka 中
- 通過編寫flink程序對數據進行處理后使用側流輸出將離線部分的數據寫到hdfs中
- 實時使用的數據分主題側流輸出到kafka對應的topic中
- 業務數據采用的canal 實時將mysql的增量數據采集對kafka對應的topic中
- 對各主題的數據進行計算然后導入至clickHouse中或redis中 以供數據的展示使用
數據建模!!!
-
一般的數據建模分兩種
- 一種是自上而下 也就是從需求出發 根據業務需求 對接運營人員 整理所需要的各種指標 根據這些指標然后自上而下尋找可以得出這些指標的事實表 維度表等 著這樣一步一步捋清楚 然后進行分析開發
- 自下而上 從數據源出發 根據我們之前埋點的數據分析能夠計算的各種指標 和運營人員對接 溝通所需要的指標 然后進行分析
-
關于大數據開發 , 阿里提出過 oneData標準, 里面提出了建模時指標統一規范定義 對數據開發的幫助其實是很有幫助的 因為之前開發的時候可能遇到兩個表join的問題 , 比如按id進行join id這個字段有int 類型的也有String 類型的 數據類型不統一 , 進行join的話默認是按 int類型來進行join 所有的String類型字段就會進入同一個reduce中 導致數據傾斜 解決辦法就是使用函數 cast( id as int )
-
具體我自己實際操作也沒有什么大的方向的建模 也就是一些小的表的建模
-
比如之前給我一個需求,就是計算新用戶留存
-
一開始我們那邊沒有一個固定的規劃,留存分析的需求會比較隨機,比如某一天突然來一個需求,求X日的新用戶在Y天(比如7天)后的留存。面對這種需求,我們都是臨時去拿X號的日活表join 7天后的日新表來計算。
-
還有像這樣的需求,指定日期段內,比如10.01 -> 10.20號期間,查詢出連續活躍天數超過5天的用戶,我們也是用臨時手段來做。
-
后來,我為這一類的需求(留存,活躍分析等),就設計了一套模型,為這一類的需求分析提高了便利和效率
-
我是這樣做的
- 首先,我設計了一個表模型,叫做 用戶連續活躍區間記錄,表里面主要記錄
- 用戶的首訪日期,用戶的活躍區間起始日,用戶的活躍區間結束日
- 如果在計算日仍然活躍的用戶,則它的最后一個活躍區間的結束日為9999-12-31
- 這個表的設計,我借鑒了“拉鏈表”的思想
-
有了這個表之后,計算留存分析,就變得很容易;只需要查看連續活躍區間表中連續活躍結束日是9999 的就可以了
-
計算用戶連續活躍天數這種需求,也很容易直接group by 用戶 max(datediff ( 時間差 ) ) 就可以得出了
數據采集
- 將app端的日志數據通過flume采集到hdfs , 當時的為了保證數據采集的安全可靠 , 我們采用了 taildir + filechannel + hdfssink 同時為了防止 日志服務器連接hdfs要開啟太多的連接, 我們配置了級聯的方式來采集, 還將sink的策略選擇了 fileover ,保證其中sink掛掉之后還能正常工作 同時, 我們在flume channel中里面還設置了一個攔截器, 直接將有問題的數據過濾掉了, 然 據里面的時間取出來,讓對應的數據進入到對應時間的的文件夾 因為flume中可以設置header 和 body 在sink寫入文件中時我們使用通配符的方式 直接讓他取header中找對應的數據
- 業務數據的話我們采用的是sqoop來進行采集的, 直接將mysql中的數據直接采集到了hive中 這邊的數據一般都是采用了增量抽取的方式來進行采集的
數據計算層
-
數據計算層的話我們主要是 基于hive做的一個離線數據分析, 建模的話 采用的是維度建模, 分了四層, 包括就是 ods , dwd , dws , ads
-
ods 的數據就是我們flume采集到的數據 , flume采集到的數據放在hdfs中 , 我們直接在hive中建表然后load到表中, 因為采集到的數據都json格式數據嘛, 我們就建表的時候指定了jsonserde
-
然后我們使用spark 程序對ods層的數據進行了一些加工處理, 包括集成了一些地理維度數據, 過濾掉了一些缺少關鍵字段信息的數據, 我們在這里還將匿名數據的問題進行了解決 , 之前時直接使用設備ID來作為匿名數據的用戶 之后進行了改進使用動態綁定的方式來對匿名用戶進行確定 主要就是做了一個 設備賬號評分關聯表
- 設備賬號評分關聯表
一個設備可能登錄了多個用戶 , 這張表保存了每個設備上每個賬號的登錄得分 登錄一次得分 +100 沒有登錄則分數就會衰減- 數據過來 取出數據中 用戶id 和 設備ID ,如果有用戶ID ,則使用用戶ID 作為 該數據的標識
- 如果沒有用戶ID ,則拿 設備ID 去關聯評分字典查詢,取出分值最高的 用戶ID 作為 guid
- 如果設備ID 和字典表沒有關聯上 ,就使用設備ID作為 guid
- 賬號設備評分關聯表實現流程
數據中存在匿名訪問數據 我們需要對匿名訪問數據關聯一個guid
因為設備ID是必須存在 不存在我們就直接過濾掉了 但是一個設備上可能登錄多個賬號我們需要選擇一個作為guid 使用動態綁定的方式 構建了一張設備賬號評分關聯表 表中的字段包括 設備ID 賬號ID 得分 最后登錄時間- 按設備和賬號進行分組, 計算每個分組內會話次數, 每次會話算一次登錄, 一次登錄得 100分
- 將 計算好的結果 和 前一天的設備賬號評分關聯表 數據進行full join 連接條件 登錄ID 和 設備ID 相同
- 前一天 登錄 今天 登錄 score相加
- 前一天 登錄 今天 沒有登錄 score衰減 為 0.7
- 前一天 沒有登錄 今天 登錄 score為 今天 的值
- 設備賬號評分關聯表
-
集成地理位置信息
- 日志數據中一般只記錄了經緯度 沒有記錄對應的省市區,需要集成
- 公司有對應的地理位置字典表 但是字段是省市區 經緯度 使用geohash的方式將數據進行了處理 變成 省市區 geohash值 然后廣播出去
- 將數據中的經緯度也轉換成geohash值 到廣播的數據中匹配 匹配的上就取出對應的省市區
- 匹配不上就使用ip2region進行查找匹配
-
dwd層放的就是我們處理好的明細寬表
-
dws層的數據主要就是對最終的結果報表數據做準備的
- dws這一層我們分主題做了一些輕度聚合表和一些中間表, 基礎流量指標分析 , 活躍度相關報表的分析等等的, 主要是為了最后的結果數據做準備, 比如流量主題的我當時做了有流量會話聚合表 (指標有 日/周/月 pv uv 總訪問時長等),
- 用戶活躍度相關分析, 做了一個日活明細表, 保存了當天活躍的用戶, 這個表就是來自于流量會話聚合表, 取出了里面所有的guid然后又做了一個用戶連續活躍區間記錄表, 這是一個拉鏈表, 可以清楚的看到用戶那天活躍那天沒活躍
-
最后ads層 , 這里面就是放的是一些我們計算時用到的需要進行展示的表
-
接下來就是數據服務層的一些東西了, 也就是olap平臺 這塊的話也分三部分,
- 第一部分 常規的我們保存在ads層的一些固定的報表, 這些表我們一般就是直接使用 blukload 導入到hbase中, 用來做數據的可視化展現
- 寫spark程序將
- 多維數據分析, 我們之前的時候直接用的hive的高階聚合函數 , 像 with cube , grouping sets(可自己定義聚合維度) , with rollup(層級聚合), 后來引入了kylin , 就使用kylin來做, 聚合好的結果默認就是保存在hbase中
- 然后的話就是一些非固定模型的在線實時計算, 我們當時使用的是presto , 他是一個純內存的計算引擎, 可以對接多種數據源
- 第一部分 常規的我們保存在ads層的一些固定的報表, 這些表我們一般就是直接使用 blukload 導入到hbase中, 用來做數據的可視化展現
-
整體的話就是這樣, 其他的話, 比如當時我們元數據管理系統用的是atlas 任務調度的話當時使用的是azkaban
-
Atlas跟大數據中各種數據源組件進行了深度整合,它可以自動去獲取這些數據源組件中的數據的元信息,納入自己的存儲提供管理查看,可以省卻大量的人工錄入元信息工作
-
寫各種的腳本然后然后根據互相之間的依賴呀串成一串 , 提交到azkaban上去執行
業務域 :
主要從業務庫表中拿去數據來進行分析, 業務庫的數據一般來自業務系統的數據庫mysql中, 主要表格包括 :
-
cms 是網站的內容管理相關表 發帖, 回帖…
-
oms 是訂單相關的表, 和交易有關的 生成訂單, 添加購物車…
-
pms 是關于產品相關的表, 產品評論 內容 回復 …
-
sms 營銷類表 優惠卷 廣告 …
-
ums 會員相關的 …
-
ods層 : 放置了每天的用戶,訂單等等的增量數據
-
dw層 : 將ods層中的增量導入dwd層, 同時對一些重要的表做了拉鏈表(全量表) 例 訂單表, 緩慢變化維度表. 便于查詢歷史上任何一天的數據的狀態,
-
dws層 : 主要是一些寬表, 根據需求將事實表和維度表進行join形成一張寬表, 以此為基礎計算所需的報表
主要的策略 : 大表 增量抽取 小表 全量抽取
主要負責!!!
- 基礎流量指標分析 (流量會話聚合表) (指標有 日/周/月 pv uv 總訪問時長等)
- 用戶活躍度主題 (拉鏈表 用戶活躍區間) ( 指標 連續活躍超過 5 天 10天 連續沉默 … 用戶留存 )
- 歸因事件分析(支付訂單 這個操作可能有多個條件導致 通過點擊某個運營位,點擊廣告位看到點進去購買 或者直接搜索) 一般使用首次歸因進行分析 同一個用戶購買一件商品歸因分析 先按時間將待歸因事件進行過濾 然后在取出第一個事件 得到的就是首次歸因 和業務人員以及前端人員一起商量
- 多維指標的統計 之前的話是使用hive的高階聚合函數來進行 后使用kylin
離線指標計算
基礎指標
基礎流量指標分析 (流量會話聚合表) (指標有 日/周/月 pv uv 總訪問時長等)
- 比如統計日活 月活這些指標 我們可以清晰的看出我們的產品目前的狀態 根據日活月活分析判斷我們的產品是否需要更進一步的推廣
- 假如當前日活月活穩定在一個正常值,但是增長緩慢,從這里或許我們就能得出,用戶增長穩定但增速變慢了,可以嘗試進一步推廣,擴大用戶規模
歸因分析
歸因事件分析(支付訂單 這個操作可能有多個條件導致 通過點擊某個運營位,點擊廣告位看到點進去購買 或者直接搜索) 一般使用首次歸因進行分析 同一個用戶購買一件商品歸因分析 先按時間將待歸因事件進行過濾 然后在取出第一個事件 得到的就是首次歸因 和業務人員以及前端人員一起商量
比如提交訂單這個事件
- 他可能有多種行為導致他提交訂單的操作
- 包括自己搜索呀, 點擊某個廣告位 點擊他人分享等 我們一般會和運營人員進行商討 選擇合適的歸因模型
- 比如選擇首次歸因 判斷用戶每次提交訂單的行為是從那個行為上最先感興趣的的
- 如果大部分都是某個廣告位點擊進行的 那我們就提高此廣告位的曝光率 以便提高訂單的轉換率
用戶活躍度
我們當時有一張日活表, 就是記錄了每天的活躍用戶
用戶活躍度分析
需求 : 求一個月內用戶連續登錄超過5天的人數 10 天的人數…
- 最簡單的方法也是最笨的方法就是連續join 效率極低
- 為此我當時開發了一個拉鏈表樣子的用戶連續活躍區間記錄表 表中字段大概就是 用戶首次登陸日期 用戶 連續活躍起始日 連續活躍結束日, 以此 再來開發最終報表就變得很容易了 而且根據這張表我們當時順帶的就將用戶留存分析也做完了 因為留存分析需求一般不是太多我們之前一般都是現算, 之前的一般都是拿之前天的日活表去join 需要查看的天的日活, join上的就是留存了的, 現在我們只需要查看連續活躍區間表中連續活躍結束日是9999 的就可以了
- 具體表的實現 :
- 使用 T-1 日的用戶連續活躍記錄表 fulljoin T 日的用戶日活表( 這張表記錄了每天的活躍用戶 )
- 分情況進行處理
- 用戶 t-1 為 9999 的且 當日活躍的 只需要修改連續活躍結束日
- 用戶 t-1 為 9999 但join不上的 說明當日沒有活躍 修改 9999為 t-1日
- 用戶 t-1 為 null t 不為null 的 使用 t日作為 首訪日期 連續活躍起始日 連續活躍結束日
- 用戶 t-1 結束日不為 9999 的不做任何改變
- 有一個問題是之前已經是閉區間的數據 需要新生成一條數據然后 union 到之前的表中
- 查詢出是閉區間的數據 和 日活表進行join 將join上數據取出 t-1日的 首訪日期 t日作為連續活躍起始日, 9999作為連續活躍結束日 和之前表 union到一起
- 用戶 t-1 為 9999 的且 當日活躍的 只需要修改連續活躍結束日
- 分情況進行處理
- 使用 T-1 日的用戶連續活躍記錄表 fulljoin T 日的用戶日活表( 這張表記錄了每天的活躍用戶 )
多維聚合bitmap
bitmap算法 就是保存在內存中的連續的二進制位, 用于對大量的整數型數據做去重和查詢操作 .
一開始并不是使用kylin來進行多維統計的使用的hive 的高階函數 with cube , 但是hive 的高階聚合函數在進行多維計算時有一些的指標的計算需要使用到count(distinct) , 會消耗很大的資源, 因此我們當時借鑒了bitmap的思想, 來對那些需要去重聚合的指標進行計算-
我們當時使用的是roaring bitmap 性能比較好, 它提供了方法將數據轉換成bitmap類型, 一條數據在bitmap中只占用一個bit位 , 存進去數據后它會將相應bit位的值置為 1 , 下次相同的數據再進來還是會進入到同一個bit位上 , 這樣就能實現去重 , 同時bitmap中還提供了計算bitmap中 1 的個數的方法, 這樣我們就可以將去重總數統計出來了.
-
同時我們還通過 bitmap 的 思想實現了層級聚合的操作 , 比如計算好 省市區 的數據之后在計算 省市 的維度我們就可以使用bitmap 中提供的or操作來進行計算
- 例 :
省 市 區 人
a b c [1001]
a b d [1100]
- 例 :
-
計算
- 那么如果要計算 a省b市 的人數 只需要將兩個數據進行or操作 就能得到的結果 [1 1 0 1]
- 因為hive中不支持bitmap類型 , 因此我們將數據轉換成binary類型存儲在hive中
- 通過自定義函數的方式實現了這些方法, 同時我們編寫sparkSQL 得到最終的結果
多維分析中的常用操作(上鉆 … ):
數據立方體中最常見的五大操作:切片Slice,切塊Dice,旋轉Pivot,上卷(也叫向上鉆取)Roll-up,下鉆Drill-down
- 下鉆Drill-down:向下從更細分的粒度(維度)來探索分析數據,如按照時間維度,按照天粒度來分析數據
- 改變維的層次,變換分析的粒度。從上層降到下一層,或者說是將匯總數據拆分到更細節的數據。比如通過對2010年第二季度的總銷售數據進行鉆取來查看2010年第二季度4、5、6每個月的消費數據,當然也可以鉆取浙江省來查看杭州市、寧波市、溫州市……這些城市的銷售數據。
- 上卷Roll-up: 向上從更粗的粒度(維度)來探索分析數據,比如時間維度,按照季度來分析數據
- 鉆取的逆操作,即從細粒度數據向高層的聚合,如將江蘇省、上海市和浙江省的銷售數據進行匯總來查看江浙滬地區的銷售數據。
- 切片Slice: 查詢某個維度等于某個指定值的數據集 比如按照產品種類等于電子產品的維度 來分析數據
- 選擇維中特定的值進行分析,比如只選擇電子產品的銷售數據,或者2010年第二季度的數據。
- 切塊Dice: 查詢某個維度等于某幾個指定值的數據集
- 選擇維中特定區間的數據或者某批特定值進行分析,比如選擇2010年第一季度到2010年第二季度的銷售數據,或者是電子產品和日用品的銷售數據。
- 旋轉Pivot:即維的位置的互換,就像是二維表的行列轉換,如通過旋轉實現產品維和地域維的互換。 旋轉 變換維度展現順序
用戶畫像
主要工作就是提取一些基礎指標 或者 給專門的做機器學習的同時提供點數據 了解過一點機器學習算法
用戶訂單畫像標簽表開發
- 標簽包括 用戶id 第一次下單時間 最后一次下單時間 一個月下單次數 一個月內下單總金額 最大訂單金額 最小訂單金額 平均訂單金額 常用支付方式 常購買物品品類 收貨地址…
- 需要使用到 訂單表 訂單商品明細表 都是從mysql中導入過來的表
- 將對應的字段取出
樸素貝葉斯
向量是一串數字
向量可以代表現實中某種事物的一系列特征和特征值
向量可以理解為:這串數字,基于原點,所指向的n維空間的,某個方向的,固定長度;最終代表的就是一個點
- 用戶行為性別預測
先經過大量統計得到一份經驗數據 ( 相對教準確的數據 ) 將這些數據經過向量化 調用API得到訓練模型將需要預測的數據也經過向量化 然后加載之前的訓練模型對數據進行預測分析得到最終結果
實時指標統計
ETL 實時新老用戶標記
判斷一條數據所屬用戶是新用戶還是老用戶
使用布隆過濾器 + Rocks DB
布隆過濾器 更加的節省空間
RocksDB 可以存儲更多狀態、有長窗口(window state)、key、value的可以保存更大的數據(2 G)同時可以實現增量checkpoint
實現方式 :
- 使用設備ID 作為用戶標識的, 進行keyBy, 設備ID相同的用戶會進入到同一個分區中 使用BloomFilter 來判斷是否為新用戶
- 然后我們定義了一個OperatorState 保存BloomFilter, 因為如果使用 keyedState 來保存 一個設備ID 就對應一個BloomFilter 浪費資源 需要實現CheckPointFunction , 這樣子一個分區就會擁有一個屬于自己分區的BloomFilter 節省資源 而且還不會數據傾斜
- 然后在實現的方法中使用 定義狀態 狀態里面保存了BloomFilter 來一條數據判斷該條數據的設備ID 是否已經存在, 存在就將該條數據置為老用戶, 同時我們使用了Rocks DB代替 stateBackEnd 來保存狀態, 這樣子就可以保存更多的狀態數據 , 同時還可以實現增量checkpoin
- RocksDB 的使用
env.setStateBackend(new RocksDBStateBackend(checkpointPath, true));
- RocksDB 的使用
直播間人氣實時統計
規則 :
進入直播間超過一分鐘的人氣值 +1 30分內連續進入直播間的人氣值不變進入間隔超過30分的人氣值+1下一次進入直播間的時間 - 上一次離開直播間的時間 > 30分鐘 又算一個人氣值實現方式 :
- 按照主播ID, deviceID 進行 keyBy
- 使用進入直播間時間+1注冊定時器 如果下次出去時間小于一分鐘, 刪除定時器 如果定時器觸發 判斷上次 進去時間 上次時間為空 人氣值 +1 上次不為空且 這次進入- 上次出去 > 30 人氣值 +1
- 計算好的數據在使用直播間號進行keyBy 將數據進行sum聚合求得最后每個直播間的人氣值, 最后再將結果輸出至redis中, 實時展示
實時統計直播間 pv , uv , 實時在線人數, 以及不同維度下的指標
pv uv 實時在線人數使用 flink 計算然后批量寫入至redis中 編寫定時器批量攢數據
將處理好的數據按批次導入至click house中, 統計多維指標
實現方式 :
- 將直播數據按 直播間ID進行keyBy 同一個直播間的數據進入同一個分區的同一個組內
- 因為要進行uv , 所以使用 布隆過濾器 來進行去重, 同時定義 pv , uv , online , BloomFilter 四個狀態來保存數據
- 一條數據進來 判斷在布隆過濾器中是否存在,
- 不存在 那么 uv ++, pv ++ ,online ++
- 存在 pv ++ , online ++ 同時將數據加入至BloomFilter中
- 數據出去 online–
- 結果數據最后輸出至redis中實時展示, 因此我們定義了一個計算器10秒觸發一次, 將數據輸出至 redis 中 具體實現 :
- 使用當前時間 / 10秒 ==> 和10 的差值
- 當前時間 - 差值 +10秒就獲得了10秒后的時間
- 10內進入的數據每次都會生成一個定時器 但是定時器的時間都是一致的 指揮觸發一次
- 這樣子就實現了10秒觸發一次的定時器
直播期間各個主播直播收到的禮物分值計算
需要關聯禮物維表 , 根據禮物維表的數據計算對應禮物的分值
使用廣播狀態, 將業務庫中的禮物表數據廣播至狀態中, 然后connect關聯查詢
Flink中廣播的數據可以實現實時的更新
實現方式 :
- 將維度數據連接MySQL查詢出來 , 整理好之后將維度數據廣播到狀態里
- 將事實數據進行整理后 和 廣播的數據進行connect 關聯 然后調用 process 方法 , 因為需要和維度數據進行關聯處理 , 因此使用 BroadcastProcessFunction
- 需要重寫兩個方法 : processBroadcastElement 處理廣播數據 可以將廣播的數據進行更新 processElement 處理事實數據 可以讀取廣播數據 不能修改
- 關聯維度數據后將最終的數據輸出 , 按主播ID 進行keyBy 后進行sum操作, 再將最終的結果數據輸出值redis中
統計 : 10分鐘內, 每隔1分鐘統計一次各個分類、各種事件類型的熱門商品(商品ID)
例 :
[10:00] , [10:10] , 華為 , 瀏覽 , p10 , 20 [10:00] , [10:10] , 華為 , 加入購物車 , p10 , 20[10:00] , [10:10] ,華為 , 瀏覽 , p30 , 29 ... [10:00] , [10:10] , 華為 , 加入購物車 , p30 , 29 ...實現方式 :
- 先將數據進行keyBy(分類ID,事件ID,商品ID),劃分窗口 ( 使用滑動窗口)
- 然后對窗口內數據進行增量聚合(效率高,全局聚合效率低,而且占用大量資源)
- 我們在增量合并的時候除了需要獲得到:( 分類ID,事件ID,商品ID,次數 ), 還需要獲取窗口的信息(窗口的起始時間,結束時間)
- 因此增量合并使用 aggregate 方法 , 這樣能夠在增量聚合的同時定義一個窗口 , 窗口觸發后可以在這個窗口中獲取到窗口聚合后的數據,并且可以得到窗口的起始時間和結束時間 輸出結果為 : ( 分類ID,事件ID,商品ID,次數,窗口起始時間,結束時間 )。
- 將數據以(分類ID,事件ID,窗口起始時間,結束時間)進行keyBy , 然后進行排序 :使用ProcessFunction的onTimer定時器進行排序,每來一條數據,不直接輸出,而是將數據存儲到State(為了容錯),再注冊一個比當前窗口的結束時間還要大一毫秒的定時器。如果下一個窗口的數據觸發了,那么Water Mark已經大于了注冊的定時器的時間,上一個窗口的數據已經攢齊了,就可以排序然后輸出。
- 將最終結果進行整理后輸出至redis中
實時統計訂單相關指標
分析的實時指標 :直播間主播帶貨總金額、商品成交( 下單 )數量
- 直播間主播帶商品各個分類的成交金額、商品成交( 下單 ) 數量
- 一天內中的成交金額
- 各個分類成交金額(維度:省份、操作系統、手機型…)
實現方式 :
- 使用雙流 Join
- 將業務庫中的訂單主表和訂單明細表取出進行分析處理 , 業務數據使用canal讀取過來的 , 數據是一個大的JSON串 我要需要的是他里面 data 對應的 一個小的 JSON 串 將里面的數據封裝到一個bean 里
- 訂單主表:訂單ID、用戶ID、訂單狀態、訂單金額、下單時間、更新時間
- 訂單明細表 :訂單主表ID、sku、數量、單價、分類ID、直播間ID
- 由于業務系統可能存在延遲, 我們將訂單明細表劃窗口, 使用側流輸出獲取窗口中遲到的數據
- 將主流中業務訂單明細數據left join 訂單主表 關聯上的就輸出 tuple2 < > 關聯不上就將關聯不上的輸出為null
- 將join后的數據和之前側流輸出的數據進行union然后將關聯不上的和側流輸出的數據 也就是 主表數據為null的連接數據庫進行查詢
- 然后將最終的結果輸出到clickhouse中
總結
- 上一篇: K线图|K线图分析法简介 |K线图怎么看
- 下一篇: 建了个微信交流群,和我一起在 b 站学