指标统计:基于流计算 Oceanus(Flink) 实现实时 UVPV 统计
作者:吳云濤,騰訊 CSIG 高級工程師
導語 | 最近梳理了一下如何用 Flink 來實現實時的 UV、PV 指標的統計,并和公司內微視部門的同事交流。然后針對該場景做了簡化,并發現使用 Flink SQL 來 實現這些指標的統計會更加便捷。
一、解決方案描述
1.1 概述
本方案結合本地自建 Kafka 集群、騰訊云流計算 Oceanus(Flink)、云數據庫 Redis 對博客、購物等網站 UV、PV 指標進行實時可視化分析。分析指標包含網站的獨立訪客數量(UV )、產品的點擊量(PV)、轉化率(轉化率 = 成交次數 / 點擊量)等。
相關概念介紹:UV(Unique Visitor):獨立訪客數量。訪問您網站的一臺客戶端為一個訪客,如用戶對同一頁面訪問了 5 次,那么該頁面的 UV 只加 1,因為 UV 統計的是去重后的用戶數而不是訪問次數。PV(Page View):點擊量或頁面瀏覽量。如用戶對同一頁面訪問了 5 次,那么該頁面的 PV 會加 5。
1.2 方案架構及優勢
根據以上實時指標統計場景,設計了如下架構圖:
涉及產品列表:
本地數據中心(IDC)的自建 Kafka 集群
私有網絡 VPC
專線接入/云聯網/VPN連接/對等連接
流計算 Oceanus (Flink)
云數據庫 Redis
二、前置準備
購買所需的騰訊云資源,并打通網絡。自建的 Kafka 集群需根據集群所在區域需采用 VPN 連接、專線連接或對等連接的方式來實現網絡互通互聯。
2.1 創建私有網絡 VPC
私有網絡(VPC)是一塊在騰訊云上自定義的邏輯隔離網絡空間,在構建 Oceanus 集群、Redis 組件等服務時選擇的網絡建議選擇同一個 VPC,網絡才能互通。否則需要使用對等連接、NAT 網關、VPN 等方式打通網絡。私有網絡創建步驟請參考幫助文檔(https://cloud.tencent.com/document/product/215/36515)。
2.2?創建 Oceanus 集群
流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基于 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平臺。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。
在 Oceanus 控制臺的【集群管理】->【新建集群】頁面創建集群,選擇地域、可用區、VPC、日志、存儲,設置初始密碼等。VPC 及子網使用剛剛創建好的網絡。創建完后 Flink 的集群如下:
2.3?創建 Redis 集群
在Redis 控制臺(https://console.cloud.tencent.com/redis#/)的【新建實例】頁面創建集群,選擇與其他組件同一地域,同區域的同一私有網絡 VPC,這里還選擇同一子網。
2.4 配置自建 Kafka 集群
2.4.1 修改自建 Kafka 集群配置
自建 Kafka 集群連接時 bootstrap-servers 參數常常使用 hostname 而不是 ip 來連接。但用自建 Kafka 集群連接騰訊云上的 Oceanus 集群為全托管集群, Oceanus 集群的節點上無法解析自建集群的 hostname 與 ip 的映射關系,所以需要改監聽器地址由 hostname 為 ip 地址連接的形式。
將 config/server.properties 配置文件中 advertised.listeners 參數配置為IP地址。示例:
# 0.10.X及以后版本 advertised.listeners=PLAINTEXT://10.1.0.10:9092 # 0.10.X之前版本 advertised.host.name=PLAINTEXT://10.1.0.10:9092修改后重啟 Kafka 集群。
! 若在云上使用到自建的zookeeper地址,也需要將zk配置中的hostname修改IP地址形式。
2.4.2 模擬發送數據到topic
本案例使用topic為topic為 uvpv-demo。
1)Kafka 客戶端
進入自建 Kafka 集群節點,啟動 Kafka 客戶端,模擬發送數據。
./bin/kafka-console-producer.sh?--broker-list?10.1.0.10:9092?--topic?uvpv-demo >{"record_type":0,?"user_id":?2,?"client_ip":?"100.0.0.2",?"product_id":?101,?"create_time":?"2021-09-08 16:20:00"} >{"record_type":0,?"user_id":?3,?"client_ip":?"100.0.0.3",?"product_id":?101,?"create_time":?"2021-09-08 16:20:00"} >{"record_type":1,?"user_id":?2,?"client_ip":?"100.0.0.1",?"product_id":?101,?"create_time":?"2021-09-08 16:20:00"}2)使用腳本發送
腳本一:Java 代碼參考:https://cloud.tencent.com/document/product/597/54834
腳本二:Python 腳本。參考之前案例中 python 腳本進行適當修改即可:
《視頻直播:實時數據可視化分析》
2.5 打通自建 IDC 集群到騰訊云網絡通信
自建 Kafka 集群聯通騰訊云網絡,可通過以下前 3 種方式打通自建 IDC 到騰訊云的網絡通信。
專線接入
https://cloud.tencent.com/document/product/216
適用于本地數據中心 IDC 與騰訊云網絡打通。
云聯網
https://cloud.tencent.com/document/product/877
適用于本地數據中心 IDC 與騰訊云網絡打通,也可用于云上不同地域間私有網絡 VPC 打通。
VPN連接
https://cloud.tencent.com/document/product/554
適用于本地數據中心 IDC 與騰訊云網絡打通。
對等連接+ NAT網關
對等連接:https://cloud.tencent.com/document/product/553
NAT網關:https://cloud.tencent.com/document/product/552
適合云上不同地域間私有網絡 VPC 打通,不適合本地 IDC 到騰訊云網絡。
本方案中使用了 VPN 連接的方式,實現本地 IDC 和云上網絡的通信。參考鏈接:
建立 VPC 到 IDC 的連接(路由表)(https://cloud.tencent.com/document/product/554/52854)
根據方案繪制了下面的網絡架構圖:
三 方案實現
3.1 業務目標
利用流計算 Oceanus 實現網站 UV、PV、轉化率指標的實時統計,這里只列取以下3種統計指標:
網站的獨立訪客數量 UV。Oceanus 處理后在 Redis 中通過 set 類型存儲獨立訪客數量,同時也達到了對同一訪客的數據去重的目的。
網站商品頁面的點擊量 PV。Oceanus 處理后在 Redis 中使用 list 類型存儲頁面點擊量。
轉化率(轉化率 = 成交次數 / 點擊量)。Oceanus 處理后在 Redis 中用 String 存儲即可。
3.2 源數據格式
Kafka topic:uvpv-demo(瀏覽記錄)
| 字段 | 類型 | 含義 |
record_type | int | 客戶號 |
user_id | varchar | 客戶ip地址 |
client_ip | varchar | 房間號 |
product_id | Int | 進入房間時間 |
create_time | timestamp | 創建時間 |
Kafka 內部采用 json 格式存儲,數據格式如下:
# 瀏覽記錄 {"record_type":0, ?# 0 表示瀏覽記錄"user_id":?6,"client_ip":?"100.0.0.6","product_id":?101,"create_time":?"2021-09-06 16:00:00" }# 購買記錄 {"record_type":1,?# 1 表示購買記錄"user_id":?6,"client_ip":?"100.0.0.8","product_id":?101,"create_time":?"2021-09-08 18:00:00" }3.3 編寫 Flink SQL 作業
示例中實現了 UV、PV 和轉化率3個指標的獲取邏輯,并寫入 Sink 端。
1、定義 Source
CREATE?TABLE?`input_web_record` (`record_type`?INT,`user_id`?INT,`client_ip`?VARCHAR,`product_id`?INT,`create_time`?TIMESTAMP,`times`?AS?create_time,WATERMARK FOR times?AS?times - INTERVAL?'10'?MINUTE ) WITH ('connector'?=?'kafka', ??-- 可選 'kafka','kafka-0.11'. 注意選擇對應的內置 Connector'topic'?=?'uvpv-demo', ?'scan.startup.mode'?=?'earliest-offset',--'properties.bootstrap.servers' = '82.157.27.147:9092','properties.bootstrap.servers'?=?'10.1.0.10:9092', ?'properties.group.id'?=?'WebRecordGroup', ?-- 必選參數, 一定要指定 Group ID'format'?=?'json','json.ignore-parse-errors'?=?'true', ? ??-- 忽略 JSON 結構解析異常'json.fail-on-missing-field'?=?'false'???-- 如果設置為 true, 則遇到缺失字段會報錯 設置為 false 則缺失字段設置為 null );2、定義 Sink
-- UV sink CREATE?TABLE?`output_uv` ( ? `userids` ? STRING, `user_id` STRING ) WITH ('connector'?=?'redis', ? ? ? ? ?'command'?=?'sadd', ? ? ? ? ? ? ?-- 使用集合保存uv(支持命令:set、lpush、sadd、hset、zadd)'nodes'?=?'192.28.28.217:6379', ?-- redis連接地址,集群模式多個節點使用'',''分隔。-- 'additional-key' = '<key>', ? -- 用于指定hset和zadd的key。hset、zadd必須設置。'password'?=?'yourpassword'?? );-- PV sink CREATE?TABLE?`output_pv` ( ? `pagevisits` ? STRING, `product_id` STRING, `hour_count`?BIGINT ) WITH ('connector'?=?'redis', ? ? ? ? ?'command'?=?'lpush', ? ? ? ? ? ? ?-- 使用列表保存pv(支持命令:set、lpush、sadd、hset、zadd)'nodes'?=?'192.28.28.217:6379', ??-- redis連接地址,集群模式多個節點使用'',''分隔。-- 'additional-key' = '<key>', ? -- 用于指定hset和zadd的key。hset、zadd必須設置。'password'?=?'yourpassword'?? );-- 轉化率 sink CREATE?TABLE?`output_conversion_rate` ( ? `conversion_rate` ? STRING, `rate` STRING ) WITH ('connector'?=?'redis', ? ? ? ?'command'?=?'set', ? ? ? ? ? ? ?-- 使用列表保存pv(支持命令:set、lpush、sadd、hset、zadd)'nodes'?=?'192.28.28.217:6379',?-- redis連接地址,集群模式多個節點使用'',''分隔。-- 'additional-key' = '<key>', -- 用于指定hset和zadd的key。hset、zadd必須設置。'password'?=?'yourpassword'?? );3、業務邏輯
-- 加工得到 UV 指標,統計所有時間內的 UV INSERT?INTO?output_uv SELECT'userids'?AS?`userids`, CAST(user_id?AS?string)?AS?user_id FROM?input_web_record ;-- 加工并得到 PV 指標,統計每 10 分鐘內的 PV INSERT?INTO?output_pv SELECT'pagevisits'?AS?pagevisits, CAST(product_id?AS?string)?AS?product_id, SUM(product_id)?AS?hour_count FROM?input_web_record?WHERE?record_type =?0 GROUP?BY HOP(times, INTERVAL?'5'?MINUTE, INTERVAL?'10'?MINUTE), product_id, user_id;-- 加工并得到轉化率指標,統計每 10 分鐘內的轉化率 INSERT?INTO?output_conversion_rate SELECT'conversion_rate'?AS?conversion_rate, CAST( (((SELECT?COUNT(1)?FROM?input_web_record?WHERE?record_type=0)*1.0)/SUM(a.product_id))?as?string) FROM?(SELECT?*?FROM?input_web_record?where?record_type =?1)?AS?a GROUP?BY?? HOP(times, INTERVAL?'5'?MINUTE, INTERVAL?'10'?MINUTE), product_id;3.4 結果驗證
通常情況,會通過 Web 網站來展示統計到的 UV、PV 指標,這里為了簡單直接在Redis 控制臺(https://console.cloud.tencent.com/redis#/)登錄進行查詢:
userids: 存儲 UV
pagevisits: 存儲 PV
conversion_rate: 存儲轉化率,即購買商品次數/總頁面點擊量。
四 總結
通過自建 Kafka 集群采集數據,在流計算 Oceanus (Flink) 中實時進行字段累加、窗口聚合等操作,將加工后的數據存儲在云數據庫Redis,統計到實時刷新的 UV、PV 等指標。這個方案在 Kafka json 格式設計時為了簡便易懂做了簡化處理,將瀏覽記錄和產品購買記錄都放在了同一個 topic 中,重點通過打通自建 IDC 和騰訊云產品間的網絡來展現整個方案。針對超大規模的 UV 去重,微視的同事采用了 Redis hyperloglog 方式來實現 UV 統計。相比直接使用 set 類型方式有極小的內存空間占用的優點,詳情見鏈接:https://cloud.tencent.com/developer/article/1889162。
流計算 Oceanus?限量秒殺專享活動火爆進行中↓↓
點擊文末「閱讀原文」,了解騰訊云流計算 Oceanus 更多信息~
騰訊云大數據
長按二維碼
關注我們
總結
以上是生活随笔為你收集整理的指标统计:基于流计算 Oceanus(Flink) 实现实时 UVPV 统计的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 双双拿下赛道全部指标最佳 | 腾讯新一代
- 下一篇: TencentOCR 斩获 ICDAR