日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark RDD创建操作

發布時間:2024/1/17 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark RDD创建操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

從集合創建RDD

  • parallelize

def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]

從一個Seq集合創建RDD。

參數1:Seq集合,必須。

參數2:分區數,默認為該Application分配到的資源的CPU核數

  • scala> var rdd = sc.parallelize(1 to 10)
  • rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :21
  • ?
  • scala> rdd.collect
  • res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  • ?
  • scala> rdd.partitions.size
  • res4: Int = 15
  • ?
  • //設置RDD為3個分區
  • scala> var rdd2 = sc.parallelize(1 to 10,3)
  • rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :21
  • ?
  • scala> rdd2.collect
  • res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  • ?
  • scala> rdd2.partitions.size
  • res6: Int = 3
  • ?
    • makeRDD

    def makeRDD[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]

    這種用法和parallelize完全相同

    def makeRDD[T](seq: Seq[(T, Seq[String])])(implicit arg0: ClassTag[T]): RDD[T]

    該用法可以指定每一個分區的preferredLocations。

  • scala> var collect = Seq((1 to 10,Seq("slave007.lxw1234.com","slave002.lxw1234.com")),
  • (11 to 15,Seq("slave013.lxw1234.com","slave015.lxw1234.com")))
  • collect: Seq[(scala.collection.immutable.Range.Inclusive, Seq[String])] = List((Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
  • List(slave007.lxw1234.com, slave002.lxw1234.com)), (Range(11, 12, 13, 14, 15),List(slave013.lxw1234.com, slave015.lxw1234.com)))
  • ?
  • scala> var rdd = sc.makeRDD(collect)
  • rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at makeRDD at :23
  • ?
  • scala> rdd.partitions.size
  • res33: Int = 2
  • ?
  • scala> rdd.preferredLocations(rdd.partitions(0))
  • res34: Seq[String] = List(slave007.lxw1234.com, slave002.lxw1234.com)
  • ?
  • scala> rdd.preferredLocations(rdd.partitions(1))
  • res35: Seq[String] = List(slave013.lxw1234.com, slave015.lxw1234.com)
  • ?
  • ?
  • 指定分區的優先位置,對后續的調度優化有幫助。

    ?

    從外部存儲創建RDD

    • textFile

    //從hdfs文件創建.

  • //從hdfs文件創建
  • scala> var rdd = sc.textFile("hdfs:///tmp/lxw1234/1.txt")
  • rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at textFile at :21
  • ?
  • scala> rdd.count
  • res48: Long = 4
  • ?
  • //從本地文件創建
  • scala> var rdd = sc.textFile("file:///etc/hadoop/conf/core-site.xml")
  • rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[28] at textFile at :21
  • ?
  • scala> rdd.count
  • res49: Long = 97
  • ?
  • 注意這里的本地文件路徑需要在Driver和Executor端存在。

    • 從其他HDFS文件格式創建

    hadoopFile

    sequenceFile

    objectFile

    newAPIHadoopFile

    • 從Hadoop接口API創建

    hadoopRDD

    newAPIHadoopRDD

    比如:從HBase創建RDD

  • scala> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
  • import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
  • ?
  • scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  • import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  • ?
  • scala> import org.apache.hadoop.hbase.client.HBaseAdmin
  • import org.apache.hadoop.hbase.client.HBaseAdmin
  • ?
  • scala> val conf = HBaseConfiguration.create()
  • scala> conf.set(TableInputFormat.INPUT_TABLE,"lxw1234")
  • scala> var hbaseRDD = sc.newAPIHadoopRDD(
  • conf,classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
  • ?
  • scala> hbaseRDD.count
  • res52: Long = 1
  • ?
  • 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

    總結

    以上是生活随笔為你收集整理的Spark RDD创建操作的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。