日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark项目实战:电商分析平台之各个范围Session步长、访问时长占比统计(需求一)

發布時間:2024/2/28 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark项目实战:电商分析平台之各个范围Session步长、访问时长占比统计(需求一) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

spark項目實戰:電商分析平臺之各個范圍Session步長、訪問時長占比統計(需求一)


  • 項目基本信息,架構,需要一覽
  • 各個范圍Session步長、訪問時長占比統計概述
  • 各個范圍Session步長、訪問時長占比統計簡要運行流程
  • 代碼實現
  • 小結

  • 1. 項目基本信息,架構,需要一覽


    見項目第一篇文章 spark項目實戰:電商分析平臺之項目概述

    代碼在github上
    初始代碼: https://github.com/githubIMrLi/spark-commerce_basic
    完整代碼:https://github.com/githubIMrLi/spark-commerce


    2. 各個范圍Session步長、訪問時長占比統計概述


  • 訪問時長:session的最早時間與最晚時間之差。
  • 訪問步長:session中的action個數。
  • 統計出符合篩選條件的session中,訪問時長在1s3s、4s6s、7s9s、10s30s、30s60s、1m3m、3m10m、10m30m、30m,訪問步長在1_3、4_6、…以上各個范圍內的各種session的占比

  • 3. 各個范圍Session步長、訪問時長占比統計簡要運行流程



    4. 代碼實現


  • 先運行 commerce_basic\mock\src\main\scala\MockDataGenerate.scala 代碼,生成所需表

  • 各個范圍Session步長、訪問時長占比統計代碼
  • import java.util.{Date, UUID}import commons.conf.ConfigurationManager import commons.constant.Constants import commons.model.{UserInfo, UserVisitAction} import commons.utils._ import net.sf.json.JSONObject import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SaveMode, SparkSession}import scala.collection.mutableobject SessionStatisticAgg {def main(args: Array[String]): Unit = {// 獲取查詢的限制條件val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)val taskParam = JSONObject.fromObject(jsonStr)// 獲取全局獨一無二的主鍵val taskUUID = UUID.randomUUID().toString// 創建sparkConfval sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")// 創建sparkSessionval sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()// actionRDD : rdd[UserVisitAction]val actionRDD = getActionRDD(sparkSession, taskParam)// session2ActionRDD : rdd[(sid, UserVisitAction)]val sessionId2ActionRDD = actionRDD.map {item => (item.session_id, item)}// sessionId2GroupRDD : rdd[(sid, iterable(UserVisitAction))]val sessionId2GroupRDD = sessionId2ActionRDD.groupByKey()// sparkSession.sparkContext.setCheckpointDir()sessionId2GroupRDD.cache()// sessionId2GroupRDD.checkpoint()sessionId2GroupRDD.foreach(println(_))// 獲取聚合數據里面的聚合信息val sessionId2FullInfoRDD = getFullInfoData(sparkSession, sessionId2GroupRDD)//創建自定義累加器val sessionStatAccumulator = new SessionStatAccumulator// 注冊自定義累加器sparkSession.sparkContext.register(sessionStatAccumulator, "sessionAccumulator")//過濾用戶數據val sessionId2FilterRDD = getFilteredData(taskParam, sessionStatAccumulator, sessionId2FullInfoRDD)sessionId2FilterRDD.foreach(println(_))for ((k,v) <- sessionStatAccumulator.value){println("k="+k+", value="+v)}//獲取最終的統計結果getFinalData(sparkSession,taskUUID,sessionStatAccumulator.value)}def getFinalData(sparkSession: SparkSession,taskUUID: String,value: mutable.HashMap[String, Int]) = {//獲取所有符合過濾條件的session個數val session_count = value.getOrElse(Constants.SESSION_COUNT,1).toDouble//不同范圍訪問時長的session個數val visit_length_1s_3s = value.getOrElse(Constants.TIME_PERIOD_1s_3s,0)val visit_length_4s_6s = value.getOrElse(Constants.TIME_PERIOD_4s_6s, 0)val visit_length_7s_9s = value.getOrElse(Constants.TIME_PERIOD_7s_9s, 0)val visit_length_10s_30s = value.getOrElse(Constants.TIME_PERIOD_10s_30s, 0)val visit_length_30s_60s = value.getOrElse(Constants.TIME_PERIOD_30s_60s, 0)val visit_length_1m_3m = value.getOrElse(Constants.TIME_PERIOD_1m_3m, 0)val visit_length_3m_10m = value.getOrElse(Constants.TIME_PERIOD_3m_10m, 0)val visit_length_10m_30m = value.getOrElse(Constants.TIME_PERIOD_10m_30m, 0)val visit_length_30m = value.getOrElse(Constants.TIME_PERIOD_30m, 0)// 不同訪問步長的session個數val step_length_1_3 = value.getOrElse(Constants.STEP_PERIOD_1_3, 0)val step_length_4_6 = value.getOrElse(Constants.STEP_PERIOD_4_6, 0)val step_length_7_9 = value.getOrElse(Constants.STEP_PERIOD_7_9, 0)val step_length_10_30 = value.getOrElse(Constants.STEP_PERIOD_10_30, 0)val step_length_30_60 = value.getOrElse(Constants.STEP_PERIOD_30_60, 0)val step_length_60 = value.getOrElse(Constants.STEP_PERIOD_60, 0)val visit_length_1s_3s_ratio = NumberUtils.formatDouble(visit_length_1s_3s / session_count, 2)val visit_length_4s_6s_ratio = NumberUtils.formatDouble(visit_length_4s_6s / session_count, 2)val visit_length_7s_9s_ratio = NumberUtils.formatDouble(visit_length_7s_9s / session_count, 2)val visit_length_10s_30s_ratio = NumberUtils.formatDouble(visit_length_10s_30s / session_count, 2)val visit_length_30s_60s_ratio = NumberUtils.formatDouble(visit_length_30s_60s / session_count, 2)val visit_length_1m_3m_ratio = NumberUtils.formatDouble(visit_length_1m_3m / session_count, 2)val visit_length_3m_10m_ratio = NumberUtils.formatDouble(visit_length_3m_10m / session_count, 2)val visit_length_10m_30m_ratio = NumberUtils.formatDouble(visit_length_10m_30m / session_count, 2)val visit_length_30m_ratio = NumberUtils.formatDouble(visit_length_30m / session_count, 2)val step_length_1_3_ratio = NumberUtils.formatDouble(step_length_1_3 / session_count, 2)val step_length_4_6_ratio = NumberUtils.formatDouble(step_length_4_6 / session_count, 2)val step_length_7_9_ratio = NumberUtils.formatDouble(step_length_7_9 / session_count, 2)val step_length_10_30_ratio = NumberUtils.formatDouble(step_length_10_30 / session_count, 2)val step_length_30_60_ratio = NumberUtils.formatDouble(step_length_30_60 / session_count, 2)val step_length_60_ratio = NumberUtils.formatDouble(step_length_60 / session_count, 2)val stat = SessionAggrStat(taskUUID, session_count.toInt, visit_length_1s_3s_ratio, visit_length_4s_6s_ratio, visit_length_7s_9s_ratio,visit_length_10s_30s_ratio, visit_length_30s_60s_ratio, visit_length_1m_3m_ratio,visit_length_3m_10m_ratio, visit_length_10m_30m_ratio, visit_length_30m_ratio,step_length_1_3_ratio, step_length_4_6_ratio, step_length_7_9_ratio,step_length_10_30_ratio, step_length_30_60_ratio, step_length_60_ratio)val statRDD = sparkSession.sparkContext.makeRDD(Array(stat))import sparkSession.implicits._statRDD.toDF().write.format("jdbc").option("url",ConfigurationManager.config.getString(Constants.JDBC_URL)).option("user",ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password",ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).option("dbtable","session_ration_0308").mode(SaveMode.Append).save()}def calculateVisitLength(visitLength: Long, sessionStatisticAccumulator: SessionStatAccumulator) = {if (visitLength >= 1 && visitLength <= 3) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1s_3s)} else if (visitLength >= 4 && visitLength <= 6) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_4s_6s)} else if (visitLength >= 7 && visitLength <= 9) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_7s_9s)} else if (visitLength >= 10 && visitLength <= 30) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10s_30s)} else if (visitLength > 30 && visitLength <= 60) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30s_60s)} else if (visitLength > 60 && visitLength <= 180) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1m_3m)} else if (visitLength > 180 && visitLength <= 600) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_3m_10m)} else if (visitLength > 600 && visitLength <= 1800) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10m_30m)} else if (visitLength > 1800) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30m)}}def calculateStepLength(stepLength: Long, sessionStatisticAccumulator: SessionStatAccumulator): Unit = {if (stepLength >= 1 && stepLength <= 3) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_1_3)} else if (stepLength >= 4 && stepLength <= 6) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_4_6)} else if (stepLength >= 7 && stepLength <= 9) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_7_9)} else if (stepLength >= 10 && stepLength <= 30) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_10_30)} else if (stepLength > 30 && stepLength <= 60) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_30_60)} else if (stepLength > 60) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_60)}}def getFilteredData(taskParam: JSONObject,sessionStatAccumulator: SessionStatAccumulator,sessionId2FullInfoRDD: RDD[(String, String)]) = {val startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE)val endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE)val professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS)val cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES)val sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX)val keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS)val categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS)var filterInfo = (if (startAge != null) Constants.PARAM_START_AGE + "=" + startAge + "|" else "") +(if (endAge != null) Constants.PARAM_END_AGE + "=" + endAge + "|" else "") +(if (professionals != null) Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" else "") +(if (cities != null) Constants.PARAM_CITIES + "=" + cities + "|" else "") +(if (sex != null) Constants.PARAM_SEX + "=" + sex + "|" else "") +(if (keywords != null) Constants.PARAM_KEYWORDS + "=" + keywords + "|" else "") +(if (categoryIds != null) Constants.PARAM_CATEGORY_IDS + "=" + categoryIds else "")if (filterInfo.endsWith("\\|"))filterInfo = filterInfo.substring(0, filterInfo.length - 1)val sessionId2FilterRDD = sessionId2FullInfoRDD.filter {case (sessionId, fullInfo) => {var success = trueif (!ValidUtils.between(fullInfo, Constants.FIELD_AGE, filterInfo, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE))success = falseif (!ValidUtils.in(fullInfo, Constants.FIELD_PROFESSIONAL, filterInfo, Constants.PARAM_PROFESSIONALS))success = falseif (!ValidUtils.in(fullInfo, Constants.FIELD_CITY, filterInfo, Constants.PARAM_CITIES))success = falseif (!ValidUtils.equal(fullInfo, Constants.FIELD_SEX, filterInfo, Constants.PARAM_SEX))success = falseif (!ValidUtils.in(fullInfo, Constants.FIELD_SEARCH_KEYWORDS, filterInfo, Constants.PARAM_KEYWORDS))success = falseif (!ValidUtils.in(fullInfo, Constants.FIELD_CATEGORY_ID, filterInfo, Constants.PARAM_CATEGORY_IDS))success = falseif (success) {sessionStatAccumulator.add(Constants.SESSION_COUNT)val visitLength = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_VISIT_LENGTH).toLongval stepLength = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_STEP_LENGTH).toLongcalculateVisitLength(visitLength, sessionStatAccumulator)calculateStepLength(stepLength, sessionStatAccumulator)}success}}sessionId2FilterRDD}def getFullInfoData(sparkSession: SparkSession, sessionId2GroupRDD: RDD[(String, Iterable[UserVisitAction])]) = {val userId2AggrInfoRDD = sessionId2GroupRDD.map {case (sid, iterableAction) => {var startTime: Date = nullvar endTime: Date = nullvar userId = -1Lval searchKeywords = new StringBuffer("")val clickCategories = new StringBuilder("")var stepLength = 0for (action <- iterableAction) {if (userId == -1) {userId = action.user_id}val actionTime = DateUtils.parseTime(action.action_time)if (startTime == null || startTime.after(actionTime))startTime = actionTimeif (endTime == null || endTime.before(actionTime))endTime = actionTimeval searchKeyword = action.search_keywordval clickCategory = action.click_category_idif (StringUtils.isNotEmpty(searchKeyword) &&!searchKeywords.toString.contains(searchKeyword))searchKeywords.append(searchKeyword + ",")if (clickCategory != -1 && !clickCategories.toString().contains(clickCategory))clickCategories.append(clickCategory + ",")stepLength += 1}val searchKw = StringUtils.trimComma(searchKeywords.toString)val clickCg = StringUtils.trimComma(clickCategories.toString())val visitLength = (endTime.getTime - startTime.getTime) / 1000val aggrInfo = Constants.FIELD_SESSION_ID + "=" + sid + "|" +Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKw + "|" +Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCg + "|" +Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" +Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|" +Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime)(userId, aggrInfo)}}val sql = "select * from user_info"import sparkSession.implicits._// sparkSession.sql(sql) : DateFrame DateSet[Row]// sparkSession.sql(sql).as[UserInfo] : DateSet[UserInfo]// sparkSession.sql(sql).as[UserInfo].rdd : RDD[UserInfo]// sparkSession.sql(sql).as[UserInfo].rdd.map(item => (item.user_id,item)) : RDD[(user_id,userInfo)]val userInfoRDD = sparkSession.sql(sql).as[UserInfo].rdd.map(item => (item.user_id, item))userId2AggrInfoRDD.join(userInfoRDD).map {case (userId, (aggrInfo, userInfo)) => {val age = userInfo.ageval professional = userInfo.professionalval sex = userInfo.sexval city = userInfo.cityval fullInfo = aggrInfo + "|" + Constants.FIELD_AGE + "=" + age + "|" +Constants.FIELD_PROFESSIONAL + "=" + professional + "|" +Constants.FIELD_SEX + "=" + sex + "|" +Constants.FIELD_CITY + "=" + cityval sessionId = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_SESSION_ID)(sessionId, fullInfo)}}}def getActionRDD(sparkSession: SparkSession, taskParam: JSONObject) = {val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)val sql = "select * from user_visit_action where date>='" + startDate +"' and date<='" + endDate + "'"import sparkSession.implicits._// sparkSession.sql(sql) : DataFrame DateSet[Row]// sparkSession.sql(sql).as[UserVisitAction] : DateSet[UserVisitAction]sparkSession.sql(sql).as[UserVisitAction].rdd} }
  • 自定義累加器代碼:
  • import org.apache.spark.util.AccumulatorV2import scala.collection.mutableclass SessionStatAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]] {val countMap = new mutable.HashMap[String, Int]()override def isZero: Boolean = countMap.isEmptyoverride def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {val acc = new SessionStatAccumulatoracc.countMap ++= this.countMapacc}override def reset(): Unit = {countMap.clear()}override def add(v: String): Unit = {if (!countMap.contains(v)) {countMap += (v -> 0) //(v, 0)}countMap.update(v, countMap(v) + 1)}override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {// (0 /: (1 to 100 ))(_+_)// (0 /: (1 to 100)){ case (item1, item2) => item1+item2 }// (1 to 100).foldLeft(0)other match {case acc: SessionStatAccumulator =>//this.countMap /: acc.countMap// 初始值:this.countMap// 迭代對象 : acc.countMap (k,v)acc.countMap.foldLeft(this.countMap) {case (map, (k, v)) => map += (k -> (map.getOrElse(k, 0) + v))}}}override def value: mutable.HashMap[String, Int] = {this.countMap} }

    結果存儲到mysql里面,結果如下

    5. 小結


    總結

    以上是生活随笔為你收集整理的spark项目实战:电商分析平台之各个范围Session步长、访问时长占比统计(需求一)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。