日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

指标统计:基于流计算 Oceanus(Flink) 实现实时 UVPV 统计

發布時間:2024/2/28 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 指标统计:基于流计算 Oceanus(Flink) 实现实时 UVPV 统计 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

作者:吳云濤,騰訊 CSIG 高級工程師

導語 | 最近梳理了一下如何用 Flink 來實現實時的 UV、PV 指標的統計,并和公司內微視部門的同事交流。然后針對該場景做了簡化,并發現使用 Flink SQL 來 實現這些指標的統計會更加便捷。

一、解決方案描述

1.1 概述

本方案結合本地自建 Kafka 集群、騰訊云流計算 Oceanus(Flink)、云數據庫 Redis 對博客、購物等網站 UV、PV 指標進行實時可視化分析。分析指標包含網站的獨立訪客數量(UV )、產品的點擊量(PV)、轉化率(轉化率 = 成交次數 / 點擊量)等。

相關概念介紹:UV(Unique Visitor):獨立訪客數量。訪問您網站的一臺客戶端為一個訪客,如用戶對同一頁面訪問了 5 次,那么該頁面的 UV 只加 1,因為 UV 統計的是去重后的用戶數而不是訪問次數。PV(Page View):點擊量或頁面瀏覽量。如用戶對同一頁面訪問了 5 次,那么該頁面的 PV 會加 5。

1.2 方案架構及優勢

根據以上實時指標統計場景,設計了如下架構圖:

涉及產品列表:

  • 本地數據中心(IDC)的自建 Kafka 集群

  • 私有網絡 VPC

  • 專線接入/云聯網/VPN連接/對等連接

  • 流計算 Oceanus (Flink)

  • 云數據庫 Redis

二、前置準備

購買所需的騰訊云資源,并打通網絡。自建的 Kafka 集群需根據集群所在區域需采用 VPN 連接、專線連接或對等連接的方式來實現網絡互通互聯。

2.1 創建私有網絡 VPC

私有網絡(VPC)是一塊在騰訊云上自定義的邏輯隔離網絡空間,在構建 Oceanus 集群、Redis 組件等服務時選擇的網絡建議選擇同一個 VPC,網絡才能互通。否則需要使用對等連接、NAT 網關、VPN 等方式打通網絡。私有網絡創建步驟請參考幫助文檔(https://cloud.tencent.com/document/product/215/36515)。

2.2?創建 Oceanus 集群

流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基于 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平臺。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。

在 Oceanus 控制臺的【集群管理】->【新建集群】頁面創建集群,選擇地域、可用區、VPC、日志、存儲,設置初始密碼等。VPC 及子網使用剛剛創建好的網絡。創建完后 Flink 的集群如下:


2.3?創建 Redis 集群

在Redis 控制臺(https://console.cloud.tencent.com/redis#/)的【新建實例】頁面創建集群,選擇與其他組件同一地域,同區域的同一私有網絡 VPC,這里還選擇同一子網。

2.4 配置自建 Kafka 集群

2.4.1 修改自建 Kafka 集群配置

自建 Kafka 集群連接時 bootstrap-servers 參數常常使用 hostname 而不是 ip 來連接。但用自建 Kafka 集群連接騰訊云上的 Oceanus 集群為全托管集群, Oceanus 集群的節點上無法解析自建集群的 hostname 與 ip 的映射關系,所以需要改監聽器地址由 hostname 為 ip 地址連接的形式。

將 config/server.properties 配置文件中 advertised.listeners 參數配置為IP地址。示例:

# 0.10.X及以后版本 advertised.listeners=PLAINTEXT://10.1.0.10:9092 # 0.10.X之前版本 advertised.host.name=PLAINTEXT://10.1.0.10:9092

修改后重啟 Kafka 集群。

! 若在云上使用到自建的zookeeper地址,也需要將zk配置中的hostname修改IP地址形式。

2.4.2 模擬發送數據到topic

本案例使用topic為topic為 uvpv-demo。

1)Kafka 客戶端

進入自建 Kafka 集群節點,啟動 Kafka 客戶端,模擬發送數據。

./bin/kafka-console-producer.sh?--broker-list?10.1.0.10:9092?--topic?uvpv-demo >{"record_type":0,?"user_id":?2,?"client_ip":?"100.0.0.2",?"product_id":?101,?"create_time":?"2021-09-08 16:20:00"} >{"record_type":0,?"user_id":?3,?"client_ip":?"100.0.0.3",?"product_id":?101,?"create_time":?"2021-09-08 16:20:00"} >{"record_type":1,?"user_id":?2,?"client_ip":?"100.0.0.1",?"product_id":?101,?"create_time":?"2021-09-08 16:20:00"}

2)使用腳本發送

腳本一:Java 代碼參考:https://cloud.tencent.com/document/product/597/54834

腳本二:Python 腳本。參考之前案例中 python 腳本進行適當修改即可:

《視頻直播:實時數據可視化分析》

2.5 打通自建 IDC 集群到騰訊云網絡通信

自建 Kafka 集群聯通騰訊云網絡,可通過以下前 3 種方式打通自建 IDC 到騰訊云的網絡通信。

  • 專線接入

    https://cloud.tencent.com/document/product/216

    適用于本地數據中心 IDC 與騰訊云網絡打通。

  • 云聯網

    https://cloud.tencent.com/document/product/877

    適用于本地數據中心 IDC 與騰訊云網絡打通,也可用于云上不同地域間私有網絡 VPC 打通。

  • VPN連接

    https://cloud.tencent.com/document/product/554

    適用于本地數據中心 IDC 與騰訊云網絡打通。

  • 對等連接+ NAT網關

    對等連接:https://cloud.tencent.com/document/product/553

    NAT網關:https://cloud.tencent.com/document/product/552

    適合云上不同地域間私有網絡 VPC 打通,不適合本地 IDC 到騰訊云網絡。

本方案中使用了 VPN 連接的方式,實現本地 IDC 和云上網絡的通信。參考鏈接:

建立 VPC 到 IDC 的連接(路由表)(https://cloud.tencent.com/document/product/554/52854)

根據方案繪制了下面的網絡架構圖:

三 方案實現

3.1 業務目標

利用流計算 Oceanus 實現網站 UV、PV、轉化率指標的實時統計,這里只列取以下3種統計指標:

  • 網站的獨立訪客數量 UV。Oceanus 處理后在 Redis 中通過 set 類型存儲獨立訪客數量,同時也達到了對同一訪客的數據去重的目的。

  • 網站商品頁面的點擊量 PV。Oceanus 處理后在 Redis 中使用 list 類型存儲頁面點擊量。

  • 轉化率(轉化率 = 成交次數 / 點擊量)。Oceanus 處理后在 Redis 中用 String 存儲即可。

3.2 源數據格式

Kafka topic:uvpv-demo(瀏覽記錄)

字段
類型
含義

record_type

int

客戶號

user_id

varchar

客戶ip地址

client_ip

varchar

房間號

product_id

Int

進入房間時間

create_time

timestamp

創建時間

Kafka 內部采用 json 格式存儲,數據格式如下:

# 瀏覽記錄 {"record_type":0, ?# 0 表示瀏覽記錄"user_id":?6,"client_ip":?"100.0.0.6","product_id":?101,"create_time":?"2021-09-06 16:00:00" }# 購買記錄 {"record_type":1,?# 1 表示購買記錄"user_id":?6,"client_ip":?"100.0.0.8","product_id":?101,"create_time":?"2021-09-08 18:00:00" }

3.3 編寫 Flink SQL 作業

示例中實現了 UV、PV 和轉化率3個指標的獲取邏輯,并寫入 Sink 端。

1、定義 Source

CREATE?TABLE?`input_web_record` (`record_type`?INT,`user_id`?INT,`client_ip`?VARCHAR,`product_id`?INT,`create_time`?TIMESTAMP,`times`?AS?create_time,WATERMARK FOR times?AS?times - INTERVAL?'10'?MINUTE ) WITH ('connector'?=?'kafka', ??-- 可選 'kafka','kafka-0.11'. 注意選擇對應的內置 Connector'topic'?=?'uvpv-demo', ?'scan.startup.mode'?=?'earliest-offset',--'properties.bootstrap.servers' = '82.157.27.147:9092','properties.bootstrap.servers'?=?'10.1.0.10:9092', ?'properties.group.id'?=?'WebRecordGroup', ?-- 必選參數, 一定要指定 Group ID'format'?=?'json','json.ignore-parse-errors'?=?'true', ? ??-- 忽略 JSON 結構解析異常'json.fail-on-missing-field'?=?'false'???-- 如果設置為 true, 則遇到缺失字段會報錯 設置為 false 則缺失字段設置為 null );

2、定義 Sink

-- UV sink CREATE?TABLE?`output_uv` ( ? `userids` ? STRING, `user_id` STRING ) WITH ('connector'?=?'redis', ? ? ? ? ?'command'?=?'sadd', ? ? ? ? ? ? ?-- 使用集合保存uv(支持命令:set、lpush、sadd、hset、zadd)'nodes'?=?'192.28.28.217:6379', ?-- redis連接地址,集群模式多個節點使用'',''分隔。-- 'additional-key' = '<key>', ? -- 用于指定hset和zadd的key。hset、zadd必須設置。'password'?=?'yourpassword'?? );-- PV sink CREATE?TABLE?`output_pv` ( ? `pagevisits` ? STRING, `product_id` STRING, `hour_count`?BIGINT ) WITH ('connector'?=?'redis', ? ? ? ? ?'command'?=?'lpush', ? ? ? ? ? ? ?-- 使用列表保存pv(支持命令:set、lpush、sadd、hset、zadd)'nodes'?=?'192.28.28.217:6379', ??-- redis連接地址,集群模式多個節點使用'',''分隔。-- 'additional-key' = '<key>', ? -- 用于指定hset和zadd的key。hset、zadd必須設置。'password'?=?'yourpassword'?? );-- 轉化率 sink CREATE?TABLE?`output_conversion_rate` ( ? `conversion_rate` ? STRING, `rate` STRING ) WITH ('connector'?=?'redis', ? ? ? ?'command'?=?'set', ? ? ? ? ? ? ?-- 使用列表保存pv(支持命令:set、lpush、sadd、hset、zadd)'nodes'?=?'192.28.28.217:6379',?-- redis連接地址,集群模式多個節點使用'',''分隔。-- 'additional-key' = '<key>', -- 用于指定hset和zadd的key。hset、zadd必須設置。'password'?=?'yourpassword'?? );

3、業務邏輯

-- 加工得到 UV 指標,統計所有時間內的 UV INSERT?INTO?output_uv SELECT'userids'?AS?`userids`, CAST(user_id?AS?string)?AS?user_id FROM?input_web_record ;-- 加工并得到 PV 指標,統計每 10 分鐘內的 PV INSERT?INTO?output_pv SELECT'pagevisits'?AS?pagevisits, CAST(product_id?AS?string)?AS?product_id, SUM(product_id)?AS?hour_count FROM?input_web_record?WHERE?record_type =?0 GROUP?BY HOP(times, INTERVAL?'5'?MINUTE, INTERVAL?'10'?MINUTE), product_id, user_id;-- 加工并得到轉化率指標,統計每 10 分鐘內的轉化率 INSERT?INTO?output_conversion_rate SELECT'conversion_rate'?AS?conversion_rate, CAST( (((SELECT?COUNT(1)?FROM?input_web_record?WHERE?record_type=0)*1.0)/SUM(a.product_id))?as?string) FROM?(SELECT?*?FROM?input_web_record?where?record_type =?1)?AS?a GROUP?BY?? HOP(times, INTERVAL?'5'?MINUTE, INTERVAL?'10'?MINUTE), product_id;

3.4 結果驗證

通常情況,會通過 Web 網站來展示統計到的 UV、PV 指標,這里為了簡單直接在Redis 控制臺(https://console.cloud.tencent.com/redis#/)登錄進行查詢:

userids: 存儲 UV

pagevisits: 存儲 PV

conversion_rate: 存儲轉化率,即購買商品次數/總頁面點擊量。

四 總結

通過自建 Kafka 集群采集數據,在流計算 Oceanus (Flink) 中實時進行字段累加、窗口聚合等操作,將加工后的數據存儲在云數據庫Redis,統計到實時刷新的 UV、PV 等指標。這個方案在 Kafka json 格式設計時為了簡便易懂做了簡化處理,將瀏覽記錄和產品購買記錄都放在了同一個 topic 中,重點通過打通自建 IDC 和騰訊云產品間的網絡來展現整個方案。針對超大規模的 UV 去重,微視的同事采用了 Redis hyperloglog 方式來實現 UV 統計。相比直接使用 set 類型方式有極小的內存空間占用的優點,詳情見鏈接:https://cloud.tencent.com/developer/article/1889162。

流計算 Oceanus?限量秒殺專享活動火爆進行中↓↓

點擊文末「閱讀原文」,了解騰訊云流計算 Oceanus 更多信息~

騰訊云大數據

長按二維碼
關注我們

總結

以上是生活随笔為你收集整理的指标统计:基于流计算 Oceanus(Flink) 实现实时 UVPV 统计的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。