日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

数据湖之iceberg系列(五)-Spark实时处理数据

發布時間:2024/1/23 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 数据湖之iceberg系列(五)-Spark实时处理数据 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1 接收網絡數據 ?將數據實時寫入到iceberg表中
開啟nc 服務用于模擬數據輸出

nc -lk 9999

2 spark實時讀取數據將數據寫入到iceberg表中

// 獲取spark對象
? ? val spark = SparkSession.builder()
? ? ? .config("spark.sql.catalog.hadoop_prod.type", "hadoop") // 設置數據源類別為hadoop
? ? ? .config("spark.sql.catalog.hadoop_prod", classOf[SparkCatalog].getName)
? ? ? // 指定Hadoop數據源的根目錄
? ? ? .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://linux01:8020//doit/iceberg/warehouse/") // 設置數據源位置
? ? ? .appName(this.getClass.getSimpleName)
? ? ? .master("local[*]")
? ? ? .getOrCreate()
? // 接收數據?
? ? val lines = spark.readStream.format("socket").option("host", "linux01").option("port", 9999).load()
? ? // 處理數據成DF?
? ? import ?spark.implicits._
? ? val data: DataFrame = lines.map(row => row.getAs[String]("value")).map(s => {
? ? ? val split: Array[String] = s.split(",")
? ? ? (split(0).toInt, split(1),split(2).toInt)
? ? }).toDF("id", "name","age")
? ? // 指定hadoop表位置
? ? val tableIdentifier: String = "hdfs://linux01:8020/doit/iceberg/warehouse/default/tb_user"
? ? // 將數據寫入到hadoop類型的表中
? ? val query = data.writeStream.outputMode("append").format("iceberg").option("path", tableIdentifier).option("checkpointLocation", "/").start()
? ? query.awaitTermination()
? ? spark.close()
3 spark讀取iceberg表中的數據

Logger.getLogger("org").setLevel(Level.ERROR)
? def main(args: Array[String]): Unit = {
? ? val spark = SparkSession.builder()
? ? ? .config("spark.sql.catalog.hadoop_prod.type", "hadoop") // 設置數據源類別為hadoop
? ? ? .config("spark.sql.catalog.hadoop_prod", classOf[SparkCatalog].getName)
? ? ? // 指定Hadoop數據源的根目錄
? ? ? .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://linux01:8020//doit/iceberg/warehouse/") // 設置數據源位置
? ? ? .appName(this.getClass.getSimpleName)
? ? ? .master("local[*]")
? ? ? .getOrCreate()
?
? ? val lines = spark.read.format("iceberg").load("hdfs://linux01:8020/doit/iceberg/warehouse/default/tb_user")
? ? lines.printSchema()
? ? lines.createTempView("tb_user")
? ? // 展示表所有的文件和所有的快照信息
? ? spark.sql("select * from hadoop_prod.default.tb_user.files").show()
? ? spark.sql("select * from hadoop_prod.default.tb_user.snapshots").show()
? ?// 查詢指定快照的數據
? ? val lines2= spark.read.format("iceberg").option("snapshot-id", 9146975902480919479L).load("hdfs://linux01:8020/doit/iceberg/warehouse/default/tb_user")
? ? lines2.show()
?// ? lines.show()
?
? ? spark.close()
?
? }
結果如下?

root
?|-- id: integer (nullable = true)
?|-- name: string (nullable = true)
?|-- age: integer (nullable = true)

+-------+--------------------+-----------+------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+------------+
|content| ? ? ? ? ? file_path|file_format|record_count|file_size_in_bytes| ? ? ? ?column_sizes| ? ? ? ?value_counts| ? null_value_counts| ? ? ? ?lower_bounds| ? ? ? ?upper_bounds|key_metadata|split_offsets|equality_ids|
+-------+--------------------+-----------+------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+------------+
| ? ? ?0|hdfs://linux01:80...| ? ?PARQUET| ? ? ? ? ? 1| ? ? ? ? ? ? ? 833|[1 -> 46, 2 -> 53...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> ! ? , 2 -> ...|[1 -> ! ? , 2 -> ...| ? ? ? ?null| ? ? ? ? ?[4]| ? ? ? ?null|
| ? ? ?0|hdfs://linux01:80...| ? ?PARQUET| ? ? ? ? ? 1| ? ? ? ? ? ? ? 835|[1 -> 47, 2 -> 53...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> ? , 2 -> ...|[1 -> ? , 2 -> ...| ? ? ? ?null| ? ? ? ? ?[4]| ? ? ? ?null|
| ? ? ?0|hdfs://linux01:80...| ? ?PARQUET| ? ? ? ? ? 1| ? ? ? ? ? ? ? 840|[1 -> 47, 2 -> 53...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> ? , 2 -> ...|[1 -> ? , 2 -> ...| ? ? ? ?null| ? ? ? ? ?[4]| ? ? ? ?null|
| ? ? ?0|hdfs://linux01:80...| ? ?PARQUET| ? ? ? ? ? 1| ? ? ? ? ? ? ? 842|[1 -> 47, 2 -> 54...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> ? , 2 -> ...|[1 -> ? , 2 -> ...| ? ? ? ?null| ? ? ? ? ?[4]| ? ? ? ?null|
| ? ? ?0|hdfs://linux01:80...| ? ?PARQUET| ? ? ? ? ? 1| ? ? ? ? ? ? ? 842|[1 -> 47, 2 -> 54...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> ? , 2 -> ...|[1 -> ? , 2 -> ...| ? ? ? ?null| ? ? ? ? ?[4]| ? ? ? ?null|
| ? ? ?0|hdfs://linux01:80...| ? ?PARQUET| ? ? ? ? ? 1| ? ? ? ? ? ? ? 849|[1 -> 47, 2 -> 55...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> ? , 2 -> ...|[1 -> ? , 2 -> ...| ? ? ? ?null| ? ? ? ? ?[4]| ? ? ? ?null|
+-------+--------------------+-----------+------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+------------+

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
| ? ? ? ?committed_at| ? ? ? ?snapshot_id| ? ? ? ? ?parent_id|operation| ? ? ? manifest_list| ? ? ? ? ? ? summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2020-12-05 15:13:...|4974727741303617264| ? ? ? ? ? ? ? null| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:13:...|6649969826606152854|4974727741303617264| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:14:...|9146975902480919479|6649969826606152854| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:26:...|3789248833638708269|9146975902480919479| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:27:...| 145534978715502615|3789248833638708269| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:43:...| 677713801965958716| 145534978715502615| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:44:...|3022463020588869964| 677713801965958716| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:44:...|4764864293483030282|3022463020588869964| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:44:...|8363256205651138549|4764864293483030282| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+


————————————————
版權聲明:本文為CSDN博主「白眼黑刺猬」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_37933018/article/details/110690749

總結

以上是生活随笔為你收集整理的数据湖之iceberg系列(五)-Spark实时处理数据的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。