数据湖之iceberg系列(五)-Spark实时处理数据
1 接收網絡數據 ?將數據實時寫入到iceberg表中
開啟nc 服務用于模擬數據輸出
nc -lk 9999
2 spark實時讀取數據將數據寫入到iceberg表中
// 獲取spark對象
? ? val spark = SparkSession.builder()
? ? ? .config("spark.sql.catalog.hadoop_prod.type", "hadoop") // 設置數據源類別為hadoop
? ? ? .config("spark.sql.catalog.hadoop_prod", classOf[SparkCatalog].getName)
? ? ? // 指定Hadoop數據源的根目錄
? ? ? .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://linux01:8020//doit/iceberg/warehouse/") // 設置數據源位置
? ? ? .appName(this.getClass.getSimpleName)
? ? ? .master("local[*]")
? ? ? .getOrCreate()
? // 接收數據?
? ? val lines = spark.readStream.format("socket").option("host", "linux01").option("port", 9999).load()
? ? // 處理數據成DF?
? ? import ?spark.implicits._
? ? val data: DataFrame = lines.map(row => row.getAs[String]("value")).map(s => {
? ? ? val split: Array[String] = s.split(",")
? ? ? (split(0).toInt, split(1),split(2).toInt)
? ? }).toDF("id", "name","age")
? ? // 指定hadoop表位置
? ? val tableIdentifier: String = "hdfs://linux01:8020/doit/iceberg/warehouse/default/tb_user"
? ? // 將數據寫入到hadoop類型的表中
? ? val query = data.writeStream.outputMode("append").format("iceberg").option("path", tableIdentifier).option("checkpointLocation", "/").start()
? ? query.awaitTermination()
? ? spark.close()
3 spark讀取iceberg表中的數據
Logger.getLogger("org").setLevel(Level.ERROR)
? def main(args: Array[String]): Unit = {
? ? val spark = SparkSession.builder()
? ? ? .config("spark.sql.catalog.hadoop_prod.type", "hadoop") // 設置數據源類別為hadoop
? ? ? .config("spark.sql.catalog.hadoop_prod", classOf[SparkCatalog].getName)
? ? ? // 指定Hadoop數據源的根目錄
? ? ? .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://linux01:8020//doit/iceberg/warehouse/") // 設置數據源位置
? ? ? .appName(this.getClass.getSimpleName)
? ? ? .master("local[*]")
? ? ? .getOrCreate()
?
? ? val lines = spark.read.format("iceberg").load("hdfs://linux01:8020/doit/iceberg/warehouse/default/tb_user")
? ? lines.printSchema()
? ? lines.createTempView("tb_user")
? ? // 展示表所有的文件和所有的快照信息
? ? spark.sql("select * from hadoop_prod.default.tb_user.files").show()
? ? spark.sql("select * from hadoop_prod.default.tb_user.snapshots").show()
? ?// 查詢指定快照的數據
? ? val lines2= spark.read.format("iceberg").option("snapshot-id", 9146975902480919479L).load("hdfs://linux01:8020/doit/iceberg/warehouse/default/tb_user")
? ? lines2.show()
?// ? lines.show()
?
? ? spark.close()
?
? }
結果如下?
root
?|-- id: integer (nullable = true)
?|-- name: string (nullable = true)
?|-- age: integer (nullable = true)
+-------+--------------------+-----------+------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+------------+
|content| ? ? ? ? ? file_path|file_format|record_count|file_size_in_bytes| ? ? ? ?column_sizes| ? ? ? ?value_counts| ? null_value_counts| ? ? ? ?lower_bounds| ? ? ? ?upper_bounds|key_metadata|split_offsets|equality_ids|
+-------+--------------------+-----------+------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+------------+
| ? ? ?0|hdfs://linux01:80...| ? ?PARQUET| ? ? ? ? ? 1| ? ? ? ? ? ? ? 833|[1 -> 46, 2 -> 53...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> ! ? , 2 -> ...|[1 -> ! ? , 2 -> ...| ? ? ? ?null| ? ? ? ? ?[4]| ? ? ? ?null|
| ? ? ?0|hdfs://linux01:80...| ? ?PARQUET| ? ? ? ? ? 1| ? ? ? ? ? ? ? 835|[1 -> 47, 2 -> 53...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> ? , 2 -> ...|[1 -> ? , 2 -> ...| ? ? ? ?null| ? ? ? ? ?[4]| ? ? ? ?null|
| ? ? ?0|hdfs://linux01:80...| ? ?PARQUET| ? ? ? ? ? 1| ? ? ? ? ? ? ? 840|[1 -> 47, 2 -> 53...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> ? , 2 -> ...|[1 -> ? , 2 -> ...| ? ? ? ?null| ? ? ? ? ?[4]| ? ? ? ?null|
| ? ? ?0|hdfs://linux01:80...| ? ?PARQUET| ? ? ? ? ? 1| ? ? ? ? ? ? ? 842|[1 -> 47, 2 -> 54...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> ? , 2 -> ...|[1 -> ? , 2 -> ...| ? ? ? ?null| ? ? ? ? ?[4]| ? ? ? ?null|
| ? ? ?0|hdfs://linux01:80...| ? ?PARQUET| ? ? ? ? ? 1| ? ? ? ? ? ? ? 842|[1 -> 47, 2 -> 54...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> ? , 2 -> ...|[1 -> ? , 2 -> ...| ? ? ? ?null| ? ? ? ? ?[4]| ? ? ? ?null|
| ? ? ?0|hdfs://linux01:80...| ? ?PARQUET| ? ? ? ? ? 1| ? ? ? ? ? ? ? 849|[1 -> 47, 2 -> 55...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> ? , 2 -> ...|[1 -> ? , 2 -> ...| ? ? ? ?null| ? ? ? ? ?[4]| ? ? ? ?null|
+-------+--------------------+-----------+------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+------------+
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
| ? ? ? ?committed_at| ? ? ? ?snapshot_id| ? ? ? ? ?parent_id|operation| ? ? ? manifest_list| ? ? ? ? ? ? summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2020-12-05 15:13:...|4974727741303617264| ? ? ? ? ? ? ? null| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:13:...|6649969826606152854|4974727741303617264| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:14:...|9146975902480919479|6649969826606152854| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:26:...|3789248833638708269|9146975902480919479| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:27:...| 145534978715502615|3789248833638708269| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:43:...| 677713801965958716| 145534978715502615| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:44:...|3022463020588869964| 677713801965958716| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:44:...|4764864293483030282|3022463020588869964| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:44:...|8363256205651138549|4764864293483030282| ? append|hdfs://linux01:80...|[spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
————————————————
版權聲明:本文為CSDN博主「白眼黑刺猬」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_37933018/article/details/110690749
總結
以上是生活随笔為你收集整理的数据湖之iceberg系列(五)-Spark实时处理数据的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据湖之iceberg系列(四)iceb
- 下一篇: 数据湖探索与实践