Spark笔记:复杂RDD的API的理解(下)
本篇接著談?wù)勀切┥晕?fù)雜的API。
1) ? flatMapValues:針對Pair RDD中的每個(gè)值應(yīng)用一個(gè)返回迭代器的函數(shù),然后對返回的每個(gè)元素都生成一個(gè)對應(yīng)原鍵的鍵值對記錄
這個(gè)方法我最開始接觸時(shí)候,總是感覺很詫異,不是太理解,現(xiàn)在回想起來主要原因是我接觸的第一個(gè)flatMapValues的例子是這樣的,代碼如下:
| 1 2 3 4 | val?rddPair:?RDD[(String, Int)]?=?sc.parallelize(List(("x01",?2), ("x02",?5), ("x03",?8), ("x04",?3), ("x01",?12), ("x03",?9)),?1) val?rddFlatMapVals1:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> x to (6) } /* 結(jié)果:(x01,2),(x01,3),(x01,4),(x01,5),(x01,6),(x02,5),(x02,6),(x04,3),(x04,4),(x04,5),(x04,6) */ println("====flatMapValues 1====:"?+ rddFlatMapVals1.collect().mkString(",")) |
這個(gè)實(shí)例使用的是scala里Range這種數(shù)據(jù)類型,Range數(shù)據(jù)類型是一個(gè)數(shù)字的范圍,細(xì)細(xì)講它的理論也沒啥意思,我們看下面的例子吧,代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 | val?list:List[Int]?=?List(1,2,3,4,5,6) val?len:Int?=?list.size -?1 val?r:Range?=?0?to len ? for?(ind <- r){ ??print(list(ind) +?";")// 1;2;3;4;5;6; } ?println("") for?(ind <-?0?to len){ ??print(list(ind) +?";")// 1;2;3;4;5;6; } println("")? |
由以上代碼我們可以看到0 to 3就是指代0,1,2,3這三個(gè)數(shù)字,所以我們可以在for循環(huán)里指代這個(gè)范圍。
其實(shí)flatMapValues接受的外部方法的返回類型是一個(gè)Seq類型,Seq類型在scala里就是一個(gè)序列,一個(gè)有序的序列,我們可以把他理解成數(shù)組,我們來看看下面的例子:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | def?flatMapRange(par:Int):Range?=?{ ??par to?6 } ? def?flatMapList(par:Int):List[Int]?=?{ ??List(par +?1000) } ? def?flatMapSeq(par:Int):Seq[Int]?=?{ ??Seq(par +?6000) } ??val?rddFlatMapVals2:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> flatMapRange(x) } ??/* 結(jié)果:(x01,2),(x01,3),(x01,4),(x01,5),(x01,6),(x02,5),(x02,6),(x04,3),(x04,4),(x04,5),(x04,6) */ ??println("====flatMapValues 2====:"?+ rddFlatMapVals2.collect().mkString(",")) ??val?rddFlatMapVals3:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> flatMapList(x) } ??/* 結(jié)果:(x01,1002),(x02,1005),(x03,1008),(x04,1003),(x01,1012),(x03,1009) */ ??println("====flatMapValues 3====:"?+ rddFlatMapVals3.collect().mkString(",")) ??val?rddFlatMapVals4:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> flatMapSeq(x) } ??/* 結(jié)果:(x01,6002),(x02,6005),(x03,6008),(x04,6003),(x01,6012),(x03,6009) */ ??println("====flatMapValues 4====:"?+ rddFlatMapVals4.collect().mkString(",")) |
談到flatMapValues這個(gè)方法,讓我不得不回憶起另外一個(gè)與之類似的方法flatMap,我們來看看這個(gè)方法的實(shí)例吧,代碼如下:
| 1 2 3 4 5 6 7 8 9 | val?rddFlatMap1:RDD[(String,Int)]?=?rddPair.flatMap(x?=> List((x._1,x._2?+?3000))) ?// 結(jié)果:(x01,3002),(x02,3005),(x03,3008),(x04,3003),(x01,3012),(x03,3009) ?println("=====flatMap 1======:"?+ rddFlatMap1.collect().mkString(",")) ?val?rddFlatMap2:RDD[Int]?=?rddPair.flatMap(x?=> List(x._2?+?8000)) ?// 結(jié)果:8002,8005,8008,8003,8012,8009 ?println("=====flatMap 2======:"?+ rddFlatMap2.collect().mkString(",")) ?val?rddFlatMap3:RDD[String]?=?rddPair.flatMap(x?=> List(x._1?+?"@!@"?+ x._2)) ?// 結(jié)果:x01@!@2,x02@!@5,x03@!@8,x04@!@3,x01@!@12,x03@!@9 println("=====flatMap 3======:"?+ rddFlatMap3.collect().mkString(",")) |
由此可見flatMap方法里的參數(shù)也是一個(gè)Seq,而且他們之間可以相互替代使用,只不過flatMapValues是讓二元組里的第一個(gè)元素保持不變的情況下進(jìn)行計(jì)算的(及key值不發(fā)生變化)。不過spark不會無緣無故的定義一個(gè)flatMapValues,它其實(shí)和spark里的分區(qū)緊密相關(guān),關(guān)于spark的分區(qū)知識我會在后面文章里談?wù)劦摹?/p>
2) rightOuterJoin,leftOuterJoin,rddCogroup及右連接,左連接和分組函數(shù)
? ? 我們首先看看他們的使用吧,代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | val?rdd:RDD[(String,Int)]?=?sc.makeRDD(List(("x01",2),("x02",5),("x03",9),("x03",21),("x04",76))) val?other:RDD[(String,Int)]?=?sc.makeRDD(List(("x01",4),("x02",6),("x03",11))) ? val?rddRight:RDD[(String,(Option[Int],Int))]?=?rdd.rightOuterJoin(other) /* 結(jié)果:(x02,(Some(5),6)),(x03,(Some(9),11)),(x03,(Some(21),11)),(x01,(Some(2),4)) */ println("====rightOuterJoin====:"?+ rddRight.collect().mkString(",")) ? val?rddLeft:RDD[(String,(Int,Option[Int]))]?=?rdd.leftOuterJoin(other) /* 結(jié)果: (x02,(5,Some(6))),(x04,(76,None)),(x03,(9,Some(11))),(x03,(21,Some(11))),(x01,(2,Some(4))) */ println("====leftOuterJoin====:"?+ rddLeft.collect().mkString(",")) val?rddSome?=?rddLeft.filter(x?=> x._2._2.isEmpty?==?false)// 過濾掉None的記錄 /* 結(jié)果: (x02,(5,Some(6))),(x03,(9,Some(11))),(x03,(21,Some(11))),(x01,(2,Some(4)))*/ println("====rddSome===:"?+ rddSome.collect().mkString(",")) ? val?rddCogroup:?RDD[(String, (Iterable[Int], Iterable[Int]))]?=?rdd.cogroup(other) /* 結(jié)果: (x02,(CompactBuffer(5),CompactBuffer(6))),(x04,(CompactBuffer(76),CompactBuffer())),(x03,(CompactBuffer(9, 21),CompactBuffer(11))),(x01,(CompactBuffer(2),CompactBuffer(4)))*/ println("===cogroup====:"?+ rddCogroup.collect().mkString(",")) |
這三個(gè)方法很好理解,就和關(guān)系數(shù)據(jù)庫里的左右連接,分組一樣,不過它們的返回值在我剛學(xué)習(xí)spark時(shí)候很是疑惑了半天,這里就好好說下它們的返回值,這其實(shí)就是學(xué)習(xí)下scala的數(shù)據(jù)類型了。
首先是Some數(shù)據(jù)類型,Some并不是一個(gè)直接操作的數(shù)據(jù)類型,它屬于Option這個(gè)數(shù)據(jù)結(jié)構(gòu)的,其實(shí)None也是Option里的數(shù)據(jù)結(jié)構(gòu),Some里面只能放一個(gè)元素,例如Some(1),Some((1,2)),為什么scala里還要這么繁瑣的定義一個(gè)Option,并在其中還定義一個(gè)Some和一個(gè)None的結(jié)構(gòu)呢?我們首先看看下面代碼:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | def?optionSome():Unit?=?{ ????/* ?????* =======option for 1========= ??????0:3 ??????2:8 ??????3:11 ??????=======option for 1========= ??????=======option for 2========= ??????0:3 ??????1:None ??????2:8 ??????3:11 ??????=======option for 2========= ?????*/ ????val?list:List[Option[Int]]?=?List(Some(3),None,Some(8),Some(11)) ????println("=======option for 1=========") ????for?(i <-?0?until list.size){ ??????if?(!list(i).isEmpty){ ????????println(i +?":"?+ list(i).get) ??????} ????} ????println("=======option for 1=========") ????println("=======option for 2=========") ????for?(j <-?0?until list.size){ ??????val?res?=?list(j)?match?{??????? ????????case?None?=> println(j +?":None") ????????case?_?=> println(j +?":"?+ list(j).get) ??????} ????} ????println("=======option for 2=========") ??} |
?
Option數(shù)據(jù)結(jié)構(gòu)其實(shí)想要表達(dá)的是一個(gè)數(shù)據(jù)集合,這個(gè)數(shù)據(jù)集合里要么有值,要么沒值,這點(diǎn)在左右連接查詢里就非常有用,其實(shí)左右連接最后的結(jié)果就是要么關(guān)聯(lián)上了要么沒有關(guān)聯(lián)上。
分組cogroup返回的結(jié)構(gòu)是CompactBuffer,CompactBuffer并不是scala里定義的數(shù)據(jù)結(jié)構(gòu),而是spark里的數(shù)據(jù)結(jié)構(gòu),它繼承自一個(gè)迭代器和序列,所以它的返回值是一個(gè)很容易進(jìn)行循環(huán)遍歷的集合,這點(diǎn)很符合cogroup的返回值類型。
好了,這篇內(nèi)容就寫完了,下一篇文章我將要簡單聊聊spark分區(qū),后面應(yīng)該暫時(shí)會停停spark的學(xué)習(xí),要搞搞前端的一些技術(shù),這都是因?yàn)楣ぷ餍枰恕?/p>
最后我將完整示例代碼給大家分享下,代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 | package?cn.com.sparktest ? import?org.apache.spark.SparkConf import?org.apache.spark.SparkContext import?org.apache.spark.SparkContext._ import?org.apache.spark.rdd.RDD import?scala.collection.immutable.List import?org.apache.spark.util.collection.CompactBuffer ? object?ScalaTest { ??val?conf:?SparkConf?=?new?SparkConf().setAppName("spark scala").setMaster("local[2]") ??val?sc:?SparkContext?=?new?SparkContext(conf) ? ??def?aggrFtnOne(par:?((Int, Int), Int)):?(Int, Int)?=?{ ????/* ???????*aggregate的初始值為(0,0): ????????====aggrFtnOne Param===:((0,0),1) ????????????????====aggrFtnOne Param===:((1,1),2) ????????????????====aggrFtnOne Param===:((3,2),3) ????????????????====aggrFtnOne Param===:((6,3),4) ????????????????====aggrFtnOne Param===:((10,4),5)*/ ????/* ???????*aggregate的初始值為(1,1): ????????====aggrFtnOne Param===:((1,1),1) ????????====aggrFtnOne Param===:((2,2),2) ????????====aggrFtnOne Param===:((4,3),3) ????????====aggrFtnOne Param===:((7,4),4) ????????====aggrFtnOne Param===:((11,5),5) ???????* */ ????println("====aggrFtnOne Param===:"?+ par.toString()) ????val?ret:?(Int, Int)?=?(par._1._1?+ par._2, par._1._2?+?1) ????ret ??} ? ??def?aggrFtnTwo(par:?((Int, Int), (Int, Int))):?(Int, Int)?=?{ ????/*aggregate的初始值為(0,0):::::((0,0),(15,5))*/ ????/*aggregate的初始值為(1,1):::::((1,1),(16,6))*/ ????println("====aggrFtnTwo Param===:"?+ par.toString()) ????val?ret:?(Int, Int)?=?(par._1._1?+ par._2._1, par._1._2?+ par._2._2) ????ret ??} ? ??def?foldFtn(par:?(Int, Int)):?Int?=?{ ????/*fold初始值為0: ????????=====foldFtn Param====:(0,1) ????????=====foldFtn Param====:(1,2) ????????=====foldFtn Param====:(3,3) ????????=====foldFtn Param====:(6,4) ????????=====foldFtn Param====:(10,5) ????????=====foldFtn Param====:(0,15) ???????* */ ????/* ???????* fold初始值為1: ????????=====foldFtn Param====:(1,1) ????????=====foldFtn Param====:(2,2) ????????=====foldFtn Param====:(4,3) ????????=====foldFtn Param====:(7,4) ????????=====foldFtn Param====:(11,5) ????????=====foldFtn Param====:(1,16) ???????* */ ????println("=====foldFtn Param====:"?+ par.toString()) ????val?ret:?Int?=?par._1?+ par._2 ????ret ??} ??? ??def?reduceFtn(par:(Int,Int)):Int?=?{ ????/* ?????* ======reduceFtn Param=====:1:2 ?????????????======reduceFtn Param=====:3:3 ???????======reduceFtn Param=====:6:4 ???????======reduceFtn Param=====:10:5 ?????*/ ????println("======reduceFtn Param=====:"?+ par._1?+?":"?+ par._2) ????par._1?+ par._2 ??} ? ??def?sparkRDDHandle():?Unit?=?{ ????val?rddInt:?RDD[Int]?=?sc.parallelize(List(1,?2,?3,?4,?5),?1) ? ????val?rddAggr1:?(Int, Int)?=?rddInt.aggregate((0,?0))((x, y)?=> (x._1?+ y, x._2?+?1), (x, y)?=> (x._1?+ y._1, x._2?+ y._2)) ????println("====aggregate 1====:"?+ rddAggr1.toString())?// (15,5) ? ????val?rddAggr2:?(Int, Int)?=?rddInt.aggregate((0,?0))((x, y)?=> aggrFtnOne(x, y), (x, y)?=> aggrFtnTwo(x, y))?// 參數(shù)可以省略元組的括號 ????println("====aggregate 2====:"?+ rddAggr2.toString())?// (15,5) ? ????val?rddAggr3:?(Int, Int)?=?rddInt.aggregate((1,?1))((x, y)?=> aggrFtnOne((x, y)), (x, y)?=> aggrFtnTwo((x, y)))?// 參數(shù)使用元組的括號 ????println("====aggregate 3====:"?+ rddAggr3.toString())?// (17,7) ????? ????val?rddAggr4:?(Int, Int)?=?rddInt.aggregate((1,?0))((x, y)?=> (x._1?* y, x._2?+?1), (x, y)?=> (x._1?* y._1, x._2?+ y._2)) ????println("====aggregate 4====:"?+ rddAggr4.toString())?// (120,5)?? ? ????val?rddFold1:?Int?=?rddInt.fold(0)((x, y)?=> x + y) ????println("====fold 1====:"?+ rddFold1)?// 15 ? ????val?rddFold2:?Int?=?rddInt.fold(0)((x, y)?=> foldFtn(x, y))?// 參數(shù)可以省略元組的括號 ????println("====fold 2=====:"?+ rddFold2)?// 15 ? ????val?rddFold3:?Int?=?rddInt.fold(1)((x, y)?=> foldFtn((x, y)))?// 參數(shù)使用元組的括號 ????println("====fold 3====:"?+ rddFold3)?// 17 ????? ????val?rddReduce1:Int?=?rddInt.reduce((x,y)?=> x + y) ????println("====rddReduce 1====:"?+ rddReduce1)// 15 ????? ????val?rddReduce2:Int?=?rddInt.reduce((x,y)?=> reduceFtn(x,y)) ????println("====rddReduce 2====:"?+ rddReduce2)// 15 ????? ??} ??? ??def?combineFtnOne(par:Int):(Int,Int)?=?{ ????/* ?????* ====combineFtnOne Param====:2 ???????====combineFtnOne Param====:5 ???????====combineFtnOne Param====:8 ???????====combineFtnOne Param====:3 ?????*/ ????println("====combineFtnOne Param====:"?+ par) ????val?ret:(Int,Int)?=?(par,1) ????ret ??} ??? ??def?combineFtnTwo(par:((Int,Int),Int)):(Int,Int)?=?{ ????/* ??????====combineFtnTwo Param====:((2,1),12) ??????====combineFtnTwo Param====:((8,1),9) ?????* */ ????println("====combineFtnTwo Param====:"?+ par.toString()) ????val?ret:(Int,Int)?=?(par._1._1?+ par._2,par._1._2?+?1) ????ret ??} ??? ??def?combineFtnThree(par:((Int,Int),(Int,Int))):(Int,Int)?=?{ ????/* ?????* 無結(jié)果打印 ?????*/ ????println("@@@@@@@@@@@@@@@@@@") ????println("====combineFtnThree Param===:"?+ par.toString()) ????val?ret:(Int,Int)?=?(par._1._1?+ par._2._1,par._1._2?+ par._2._2) ????ret ??} ??? ??def?flatMapRange(par:Int):Range?=?{ ????par to?6 ??} ??? ??def?flatMapList(par:Int):List[Int]?=?{ ????List(par +?1000) ??} ??? ??def?flatMapSeq(par:Int):Seq[Int]?=?{ ????Seq(par +?6000) ??} ? ??def?sparkPairRDD():?Unit?=?{ ????val?rddPair:?RDD[(String, Int)]?=?sc.parallelize(List(("x01",?2), ("x02",?5), ("x03",?8), ("x04",?3), ("x01",?12), ("x03",?9)),?1) ????? ????/* def combineByKey[C](createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C): RDD[(String, C)] */??? ????val?rddCombine1:RDD[(String,(Int,Int))]?=?rddPair.combineByKey(x?=> (x,?1), (com:?(Int, Int), x)?=> (com._1?+ x, com._2?+?1), (com1:?(Int, Int), com2:?(Int, Int))?=> (com1._1?+ com2._1, com1._2?+ com2._2)) ????println("====combineByKey 1====:"?+ rddCombine1.collect().mkString(","))?// (x02,(5,1)),(x03,(17,2)),(x01,(14,2)),(x04,(3,1)) ????? ????val?rddCombine2:RDD[(String,(Int,Int))]?=?rddPair.combineByKey(x?=> combineFtnOne(x), (com:?(Int, Int), x)?=> combineFtnTwo(com,x), (com1:?(Int, Int), com2:?(Int, Int))?=> combineFtnThree(com1,com2)) ????println("=====combineByKey 2====:"?+ rddCombine2.collect().mkString(","))?// (x02,(5,1)),(x03,(17,2)),(x01,(14,2)),(x04,(3,1)) ????? ????? ????val?rddKeys:RDD[String]?=?rddPair.keys ????/*結(jié)果:x01,x02,x03,x04,x01,x03? 注意調(diào)用keys方法時(shí)候不能加上括號,否則會報(bào)錯(cuò)*/ ????println("====keys====:"?+ rddKeys.collect().mkString(",")) ????? ????val?rddVals:RDD[Int]?=?rddPair.values ????/*結(jié)果:2,5,8,3,12,9? 注意調(diào)用values方法時(shí)候不能加上括號,否則會報(bào)錯(cuò)*/ ????println("=====values=====:"?+ rddVals.collect().mkString(",")) ????? ????val?rddFlatMapVals1:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> x to (6) } ????/* 結(jié)果:(x01,2),(x01,3),(x01,4),(x01,5),(x01,6),(x02,5),(x02,6),(x04,3),(x04,4),(x04,5),(x04,6) */ ????println("====flatMapValues 1====:"?+ rddFlatMapVals1.collect().mkString(",")) ????val?rddFlatMapVals2:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> flatMapRange(x) } ????/* 結(jié)果:(x01,2),(x01,3),(x01,4),(x01,5),(x01,6),(x02,5),(x02,6),(x04,3),(x04,4),(x04,5),(x04,6) */ ????println("====flatMapValues 2====:"?+ rddFlatMapVals2.collect().mkString(",")) ????val?rddFlatMapVals3:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> flatMapList(x) } ????/* 結(jié)果:(x01,1002),(x02,1005),(x03,1008),(x04,1003),(x01,1012),(x03,1009) */ ????println("====flatMapValues 3====:"?+ rddFlatMapVals3.collect().mkString(",")) ????val?rddFlatMapVals4:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> flatMapSeq(x) } ????/* 結(jié)果:(x01,6002),(x02,6005),(x03,6008),(x04,6003),(x01,6012),(x03,6009) */ ????println("====flatMapValues 4====:"?+ rddFlatMapVals4.collect().mkString(",")) ????? ????val?rddFlatMap1:RDD[(String,Int)]?=?rddPair.flatMap(x?=> List((x._1,x._2?+?3000))) ????// 結(jié)果:(x01,3002),(x02,3005),(x03,3008),(x04,3003),(x01,3012),(x03,3009) ????println("=====flatMap 1======:"?+ rddFlatMap1.collect().mkString(",")) ????val?rddFlatMap2:RDD[Int]?=?rddPair.flatMap(x?=> List(x._2?+?8000)) ????// 結(jié)果:8002,8005,8008,8003,8012,8009 ????println("=====flatMap 2======:"?+ rddFlatMap2.collect().mkString(",")) ????val?rddFlatMap3:RDD[String]?=?rddPair.flatMap(x?=> List(x._1?+?"@!@"?+ x._2)) ????// 結(jié)果:x01@!@2,x02@!@5,x03@!@8,x04@!@3,x01@!@12,x03@!@9 ????println("=====flatMap 3======:"?+ rddFlatMap3.collect().mkString(",")) ??} ??? ??def?optionSome():Unit?=?{ ????/* ?????* =======option for 1========= ??????0:3 ??????2:8 ??????3:11 ??????=======option for 1========= ??????=======option for 2========= ??????0:3 ??????1:None ??????2:8 ??????3:11 ??????=======option for 2========= ?????*/ ????val?list:List[Option[Int]]?=?List(Some(3),None,Some(8),Some(11)) ????println("=======option for 1=========") ????for?(i <-?0?until list.size){ ??????if?(!list(i).isEmpty){ ????????println(i +?":"?+ list(i).get) ??????} ????} ????println("=======option for 1=========") ????println("=======option for 2=========") ????for?(j <-?0?until list.size){ ??????val?res?=?list(j)?match?{??????? ????????case?None?=> println(j +?":None") ????????case?_?=> println(j +?":"?+ list(j).get) ??????} ????} ????println("=======option for 2=========") ??} ??? ??def?pairRDDJoinGroup():Unit?=?{ ????val?rdd:RDD[(String,Int)]?=?sc.makeRDD(List(("x01",2),("x02",5),("x03",9),("x03",21),("x04",76))) ????val?other:RDD[(String,Int)]?=?sc.makeRDD(List(("x01",4),("x02",6),("x03",11))) ????? ????val?rddRight:RDD[(String,(Option[Int],Int))]?=?rdd.rightOuterJoin(other) ????/* 結(jié)果:(x02,(Some(5),6)),(x03,(Some(9),11)),(x03,(Some(21),11)),(x01,(Some(2),4)) */ ????println("====rightOuterJoin====:"?+ rddRight.collect().mkString(",")) ????? ????val?rddLeft:RDD[(String,(Int,Option[Int]))]?=?rdd.leftOuterJoin(other) ????/* 結(jié)果: (x02,(5,Some(6))),(x04,(76,None)),(x03,(9,Some(11))),(x03,(21,Some(11))),(x01,(2,Some(4))) */ ????println("====leftOuterJoin====:"?+ rddLeft.collect().mkString(",")) ????val?rddSome?=?rddLeft.filter(x?=> x._2._2.isEmpty?==?false)// 過濾掉None的記錄 ????/* 結(jié)果: (x02,(5,Some(6))),(x03,(9,Some(11))),(x03,(21,Some(11))),(x01,(2,Some(4)))*/ ????println("====rddSome===:"?+ rddSome.collect().mkString(",")) ????? ????val?rddCogroup:?RDD[(String, (Iterable[Int], Iterable[Int]))]?=?rdd.cogroup(other) ????/* 結(jié)果: (x02,(CompactBuffer(5),CompactBuffer(6))),(x04,(CompactBuffer(76),CompactBuffer())),(x03,(CompactBuffer(9, 21),CompactBuffer(11))),(x01,(CompactBuffer(2),CompactBuffer(4)))*/ ????println("===cogroup====:"?+ rddCogroup.collect().mkString(",")) ??} ??? ??def?scalaBasic(){ ????val?its:Iterable[Int]?=?Iterable(1,2,3,4,5) ????its.foreach { x?=> print(x +?",") }// 1,2,3,4,5, ????? ????val?tuple2Param1:Tuple2[String,Int]?=?Tuple2("x01",12)// 標(biāo)準(zhǔn)定義二元組 ????val?tuple2Param2:(String,Int)?=?("x02",29)// 字面量定義二元組 ????? ????/* 結(jié)果: x01:12*/ ????println("====tuple2Param1====:"?+ tuple2Param1._1?+?":"?+ tuple2Param1._2) ????/* 結(jié)果: x02:29 */ ????println("====tuple2Param2====:"?+ tuple2Param2._1?+?":"?+ tuple2Param2._2) ????? ????val?tuple6Param1:Tuple6[String,Int,Int,Int,Int,String]?=?Tuple6("xx01",1,2,3,4,"x1x")// 標(biāo)準(zhǔn)定義6元組 ????val?tuple6Param2:(String,Int,Int,Int,Int,String)?=?("xx02",1,2,3,4,"x2x")// 字面量定義6元組 ????? ????/* 結(jié)果: xx01:1:2:3:4:x1x */ ????println("====tuple6Param1====:"?+ tuple6Param1._1?+?":"?+ tuple6Param1._2?+?":"?+ tuple6Param1._3?+?":"?+ tuple6Param1._4?+?":"?+ tuple6Param1._5?+?":"?+ tuple6Param1._6) ????/* 結(jié)果: xx02:1:2:3:4:x2x */ ????println("====tuple6Param2====:"?+ tuple6Param2._1?+?":"?+ tuple6Param2._2?+?":"?+ tuple6Param2._3?+?":"?+ tuple6Param2._4?+?":"?+ tuple6Param2._5?+?":"?+ tuple6Param2._6) ????? ?????val?list:List[Int]?=?List(1,2,3,4,5,6) ?????val?len:Int?=?list.size -?1 ?????val?r:Range?=?0?to len ?????? ?????for?(ind <- r){ ???????print(list(ind) +?";")// 1;2;3;4;5;6; ?????} ??????println("") ?????for?(ind <-?0?to len){ ???????print(list(ind) +?";")// 1;2;3;4;5;6; ?????} ?????println("") ??} ? ??def?main(args:?Array[String]):?Unit?=?{ ????scalaBasic() ????optionSome() ????? ????sparkRDDHandle() ????sparkPairRDD() ????pairRDDJoinGroup()?? ? ????? ??} } |
總結(jié)
以上是生活随笔為你收集整理的Spark笔记:复杂RDD的API的理解(下)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark笔记:复杂RDD的API的理解
- 下一篇: RDD基本转换操作:zipWithInd