日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

使用Spark Streaming SQL基于时间窗口进行数据统计

發布時間:2024/8/23 数据库 54 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用Spark Streaming SQL基于时间窗口进行数据统计 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.背景介紹

流式計算一個很常見的場景是基于事件時間進行處理,常用于檢測、監控、根據時間進行統計等系統中。比如埋點日志中每條日志記錄了埋點處操作的時間,或者業務系統中記錄了用戶操作時間,用于統計各種操作處理的頻率等,或者根據規則匹配,進行異常行為檢測或監控系統告警。這樣的時間數據都會包含在事件數據中,需要提取時間字段并根據一定的時間范圍進行統計或者規則匹配等。
使用Spark Streaming SQL可以很方便的對事件數據中的時間字段進行處理,同時Spark Streaming SQL提供的時間窗口函數可以將事件時間按照一定的時間區間對數據進行統計操作。
本文通過講解一個統計用戶在過去5秒鐘內點擊網頁次數的案例,介紹如何使用Spark Streaming SQL對事件時間進行操作。

2.時間窗語法說明

Spark Streaming SQL支持兩類窗口操作:滾動窗口(TUMBLING)和滑動窗口(HOPPING)。

2.1滾動窗口

滾動窗口(TUMBLING)根據每條數據的時間字段將數據分配到一個指定大小的窗口中進行操作,窗口以窗口大小為步長進行滑動,窗口之間不會出現重疊。例如:如果指定了一個5分鐘大小的滾動窗口,數據會根據時間劃分到?[0:00 - 0:05)、?[0:05, 0:10)、[0:10, 0:15)等窗口。

  • 語法
GROUP BY TUMBLING ( colName, windowDuration )
  • 示例

對inventory表的inv_data_time時間列進行窗口操作,統計inv_quantity_on_hand的均值;窗口大小為1分鐘。

SELECT avg(inv_quantity_on_hand) qoh FROM inventory GROUP BY TUMBLING (inv_data_time, interval 1 minute)

2.2滑動窗口

滑動窗口(HOPPING),也被稱作Sliding Window。不同于滾動窗口,滑動窗口可以設置窗口滑動的步長,所以窗口可以重疊。滑動窗口有兩個參數:windowDuration和slideDuration。slideDuration為每次滑動的步長,windowDuration為窗口的大小。當slideDuration <?windowDuration時窗口會重疊,每個元素會被分配到多個窗口中。
所以,滾動窗口其實是滑動窗口的一種特殊情況,即slideDuration =?windowDuration則等同于滾動窗口。

  • 語法
GROUP BY HOPPING ( colName, windowDuration, slideDuration )
  • 示例

對inventory表的inv_data_time時間列進行窗口操作,統計inv_quantity_on_hand的均值;窗口為1分鐘,滑動步長為30秒。

SELECT avg(inv_quantity_on_hand) qoh FROM inventory GROUP BY HOPPING (inv_data_time, interval 1 minute, interval 30 second)

3.系統架構

業務日志收集到Aliyun SLS后,Spark對接SLS,通過Streaming SQL對數據進行處理并將統計后的結果寫入HDFS中。后續的操作流程主要集中在Spark Streaming SQL接收SLS數據并寫入HDFS的部分,有關日志的采集請參考日志服務。

4.操作流程

4.1環境準備

  • 創建E-MapReduce?3.21.0以上版本的Hadoop集群。
  • 下載并編譯E-MapReduce-SDK包
git clone git@github.com:aliyun/aliyun-emapreduce-sdk.git cd aliyun-emapreduce-sdk git checkout -b master-2.x origin/master-2.x mvn clean package -DskipTests

編譯完后, assembly/target目錄下會生成emr-datasources_shaded_${version}.jar,其中${version}為sdk的版本。

4.2創建表

命令行啟動spark-sql客戶端

spark-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar

創建SLS和HDFS表

spark-sql> CREATE DATABASE IF NOT EXISTS default; spark-sql> USE default;-- 數據源表 spark-sql> CREATE TABLE IF NOT EXISTS sls_user_log USING loghub OPTIONS ( sls.project = "${logProjectName}", sls.store = "${logStoreName}", access.key.id = "${accessKeyId}", access.key.secret = "${accessKeySecret}", endpoint = "${endpoint}");--結果表 spark-sql> CREATE TABLE hdfs_user_click_count USING org.apache.spark.sql.json OPTIONS (path '${hdfsPath}');

4.3統計用戶點擊數

spark-sql>SET streaming.query.name=user_click_count; spark-sql>SET spark.sql.streaming.checkpointLocation.user_click_count=hdfs:///tmp/spark/sql/streaming/test/user_click_count; spark-sql>insert into hdfs_user_click_count select sum(cast(action_click as int)) as click, userId, window from sls_user_log where delay(__time__)<"1 minute" group by TUMBLING(__time__, interval 5 second), userId;

其中,內建函數delay()用來設置Streaming SQL中的watermark,后續會有專門的文章介紹Streaming SQL watermark的相關內容。

4.4查看結果

可以看到,產生的結果會自動生成一個window列,包含窗口的起止時間信息。

5.結語

本文簡要介紹了流式處理中基于事件時間進行處理的場景,以及Spark Streaming SQL時間窗口的相關內容,并通過一個簡單案例介紹了時間窗口的使用。后續文章,我將介紹Spark Streaming SQL的更多內容。


原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。

總結

以上是生活随笔為你收集整理的使用Spark Streaming SQL基于时间窗口进行数据统计的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。