2021年大数据Spark(五十一):Structured Streaming 物联网设备数据分析
目錄
???????物聯網設備數據分析
???????設備監控數據準備
???????創建Topic
???????模擬數據
???????SQL風格
???????DSL風格
物聯網設備數據分析
在物聯網時代,大量的感知器每天都在收集并產生著涉及各個領域的數據。物聯網提供源源不斷的數據流,使實時數據分析成為分析數據的理想工具。
?
模擬一個智能物聯網系統的數據統計分析,產生設備數據發送到Kafka,結構化流Structured Streaming實時消費統計。對物聯網設備狀態信號數據,實時統計分析:
?1)、信號強度大于30的設備;
?2)、各種設備類型的數量;
?3)、各種設備類型的平均信號強度;
?
???????設備監控數據準備
編寫程序模擬生成物聯網設備監控數據,發送到Kafka Topic中,此處為了演示字段較少,實際生產項目中字段很多。
???????創建Topic
啟動Kafka Broker服務,創建Topic【search-log-topic】,命令如下所示:
#查看topic信息/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181#刪除topic/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic iotTopic#創建topic/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic iotTopic#模擬生產者/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic iotTopic#模擬消費者/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic iotTopic?--from-beginning
?
???????模擬數據
模擬設備監控日志數據,字段信息封裝到CaseClass樣例類【DeviceData】類:
模擬產生日志數據類【MockIotDatas】具體代碼如下:
package cn.itcast.structedstreamingimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Jsonimport scala.util.Randomobject MockIotDatas {def main(args: Array[String]): Unit = {// 發送Kafka Topicval props = new Properties()props.put("bootstrap.servers", "node1:9092")props.put("acks", "1")props.put("retries", "3")props.put("key.serializer", classOf[StringSerializer].getName)props.put("value.serializer", classOf[StringSerializer].getName)val producer = new KafkaProducer[String, String](props)val deviceTypes = Array("db", "bigdata", "kafka", "route", "bigdata", "db", "bigdata", "bigdata", "bigdata")val random: Random = new Random()while (true) {val index: Int = random.nextInt(deviceTypes.length)val deviceId: String = s"device_${(index + 1) * 10 + random.nextInt(index + 1)}"val deviceType: String = deviceTypes(index)val deviceSignal: Int = 10 + random.nextInt(90)// 模擬構造設備數據val deviceData = DeviceData(deviceId, deviceType, deviceSignal, System.currentTimeMillis())// 轉換為JSON字符串val deviceJson: String = new Json(org.json4s.DefaultFormats).write(deviceData)println(deviceJson)Thread.sleep(100 + random.nextInt(500))val record = new ProducerRecord[String, String]("iotTopic", deviceJson)producer.send(record)}// 關閉連接producer.close()}/*** 物聯網設備發送狀態數據*/case class DeviceData(device: String, //設備標識符IDdeviceType: String, //設備類型,如服務器mysql, redis, kafka或路由器routesignal: Double, //設備信號time: Long //發送數據時間)}
?
相當于大機房中各個服務器定時發送相關監控數據至Kafka中,服務器部署服務有數據庫db、大數據集群bigdata、消息隊列kafka及路由器route等等,數據樣本:
{"device":"device_50","deviceType":"bigdata","signal":91.0,"time":1590660338429}{"device":"device_20","deviceType":"bigdata","signal":17.0,"time":1590660338790}{"device":"device_32","deviceType":"kafka","signal":93.0,"time":1590660338908}{"device":"device_82","deviceType":"bigdata","signal":72.0,"time":1590660339380}{"device":"device_32","deviceType":"kafka","signal":10.0,"time":1590660339972}{"device":"device_30","deviceType":"kafka","signal":81.0,"time":1590660340442}{"device":"device_32","deviceType":"kafka","signal":29.0,"time":1590660340787}{"device":"device_96","deviceType":"bigdata","signal":18.0,"time":1590660343554}
?
???????SQL風格
按照業務需求,從Kafka消費日志數據,提取字段信息,將DataFrame注冊為臨時視圖,其中使用函數get_json_object提取JSON字符串中字段值,編寫SQL執行分析,將最終結果打印控制臺
代碼如下:
package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{DoubleType, LongType}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 對物聯網設備狀態信號數據,實時統計分析,基于SQL編程* 1)、信號強度大于30的設備* 2)、各種設備類型的數量* 3)、各種設備類型的平均信號強度*/
object IotStreamingOnlineSQL {def main(args: Array[String]): Unit = {// 1. 構建SparkSession會話實例對象,設置屬性信息val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import org.apache.spark.sql.functions._import spark.implicits._// 2. 從Kafka讀取數據,底層采用New Consumer APIval iotStreamDF: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1:9092").option("subscribe", "iotTopic")// 設置每批次消費數據最大值.option("maxOffsetsPerTrigger", "100000").load()// 3. 對獲取數據進行解析,封裝到DeviceData中val etlStreamDF: DataFrame = iotStreamDF// 獲取value字段的值,轉換為String類型.selectExpr("CAST(value AS STRING)")// 將數據轉換Dataset.as[String] // 內部字段名為value// 過濾數據.filter(StringUtils.isNotBlank(_))// 解析JSON數據:{"device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796}.select(get_json_object($"value", "$.device").as("device_id"),get_json_object($"value", "$.deviceType").as("device_type"),get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),get_json_object($"value", "$.time").cast(LongType).as("time"))// 4. 依據業務,分析處理// TODO: signal > 30 所有數據,按照設備類型 分組,統計數量、平均信號強度// 4.1 注冊DataFrame為臨時視圖etlStreamDF.createOrReplaceTempView("t_iots")// 4.2 編寫SQL執行查詢val resultStreamDF: DataFrame = spark.sql("""|SELECT| ?device_type,| ?COUNT(device_type) AS count_device,| ?ROUND(AVG(signal), 2) AS avg_signal|FROM t_iots|WHERE signal > 30 GROUP BY device_type|""".stripMargin)// 5. 啟動流式應用,結果輸出控制臺val query: StreamingQuery = resultStreamDF.writeStream.outputMode(OutputMode.Complete()).foreachBatch((batchDF: DataFrame, batchId: Long) => {println("===========================================")println(s"BatchId = ${batchId}")println("===========================================")if (!batchDF.isEmpty) {batchDF.coalesce(1).show(20, truncate = false)}}).start()query.awaitTermination()query.stop()}
}
?
???????DSL風格
按照業務需求,從Kafka消費日志數據,基于DataFrame數據結構調用函數分析,代碼如下:
package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{DoubleType, LongType}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 對物聯網設備狀態信號數據,實時統計分析:* 1)、信號強度大于30的設備* 2)、各種設備類型的數量* 3)、各種設備類型的平均信號強度*/
object IotStreamingOnlineDSL {def main(args: Array[String]): Unit = {// 1. 構建SparkSession會話實例對象,設置屬性信息val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import org.apache.spark.sql.functions._import spark.implicits._// 2. 從Kafka讀取數據,底層采用New Consumer APIval iotStreamDF: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1:9092").option("subscribe", "iotTopic")// 設置每批次消費數據最大值.option("maxOffsetsPerTrigger", "100000").load()// 3. 對獲取數據進行解析,封裝到DeviceData中val etlStreamDF: DataFrame = iotStreamDF// 獲取value字段的值,轉換為String類型.selectExpr("CAST(value AS STRING)")// 將數據轉換Dataset.as[String] // 內部字段名為value// 過濾數據.filter(StringUtils.isNotBlank(_))// 解析JSON數據:{"device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796}.select(get_json_object($"value", "$.device").as("device_id"),get_json_object($"value", "$.deviceType").as("device_type"),get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),get_json_object($"value", "$.time").cast(LongType).as("time"))// 4. 依據業務,分析處理// TODO: signal > 30 所有數據,按照設備類型 分組,統計數量、平均信號強度val resultStreamDF: DataFrame = etlStreamDF// 信號強度大于10.filter($"signal" > 30)// 按照設備類型 分組.groupBy($"device_type")// 統計數量、評價信號強度.agg(count($"device_type").as("count_device"),round(avg($"signal"), 2).as("avg_signal"))// 5. 啟動流式應用,結果輸出控制臺val query: StreamingQuery = resultStreamDF.writeStream.outputMode(OutputMode.Complete()).format("console").option("numRows", "10").option("truncate", "false").start()query.awaitTermination()query.stop()}
}
?
總結
以上是生活随笔為你收集整理的2021年大数据Spark(五十一):Structured Streaming 物联网设备数据分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(五十):St
- 下一篇: 2021年大数据Spark(五十二):S