日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

scala常用spark的pom.xml与读取csv为rdd到最终join操作

發布時間:2023/12/31 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 scala常用spark的pom.xml与读取csv为rdd到最终join操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這個問題其實本來沒啥難度,不值得記錄,

但是因為join需要的是Array((),(),())這樣的格式,

而不是Array(Array(),Array(),Array())這樣的格式,讓問題瞬間有了一點點難度.

---------------------------------------------------------------------------------------------------------------------------------------------------------

import org.apache.spark.{SparkConf, SparkContext} import org.apache.log4j.{Level, Logger} //注意這個代碼連接的是真實集群, 每次運行前都要跑一次mvn package,然后再在intellij中點擊runobject hello {def main(args: Array[String]){Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.project-spark").setLevel(Level.WARN)import org.apache.spark.SparkConfval package_path="/home/appleyuchi/桌面/spark_success/Spark數據傾斜處理/Java/hello/target/hello-1.0-SNAPSHOT.jar"val conf = new SparkConf().setMaster("spark://Desktop:7077").setJars(Array[String](package_path)).setAppName("TestSpark")var sc = new SparkContext(conf)// 下面開始讀取數據var rdd1=sc.textFile("hdfs://Desktop:9000/rdd1.csv").map(line=>line.split(",")).map{ case Array(f1,f2) => (f1,f2)}var rdd2=sc.textFile("hdfs://Desktop:9000/rdd2.csv").map(line=>line.split(",")).map{ case Array(f1,f2) => (f1,f2)}println("---------------------rdd1-----------------------")println(rdd1.join(rdd2).collect().mkString)} }

------------------------------------------------------pom.xml-----------------------------------------------------------------------------------

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>hello</groupId><artifactId>hello</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.12</artifactId><version>3.0.0</version><scope>runtime</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-graphx --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-graphx_2.12</artifactId><version>3.0.0</version></dependency></dependencies></project>

---------------------------------------------------------------------------------------------------------------------------------------------------------

scala> var csv1 = sc.textFile("/file1.csv")
csv1: org.apache.spark.rdd.RDD[String] = /file1.csv MapPartitionsRDD[20] at textFile at <console>:24

scala> var data1 = csv1.map(line => line.split(",")).map{case Array(f1,f2,f3)=>(f1,f2,f3)}
data1: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[22] at map at <console>:25

scala> data1.collect()
res21: Array[(String, String, String)] = Array((user,topic,yuchi), (om,scala,yuchi), (daniel,spark,8099), (3754978,spark,199))

-------------------------------------------------------------------附錄-------------------------------------------------------------------------------------

全部上傳到hdfs系統

hdfs dfs -put file1.csv /
hdfs dfs -put file2.csv /

--------------------------注意只能是兩列數據,不能是三列--------------------------------------------

rdd1.csv內容

001,hello
001,hello
001,hello
001,hello

--------------------

rdd2.csv內容

001,world
001,world
001,world
001,world

----------------------------------------------------------------------------------------------------------------------------------------------------------------

Reference:

[1]How to convert an Array to a Tuple?

總結

以上是生活随笔為你收集整理的scala常用spark的pom.xml与读取csv为rdd到最终join操作的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。