日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark笔记:复杂RDD的API的理解(下)

發(fā)布時(shí)間:2024/1/17 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark笔记:复杂RDD的API的理解(下) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本篇接著談?wù)勀切┥晕?fù)雜的API。

1) ? flatMapValues:針對Pair RDD中的每個(gè)值應(yīng)用一個(gè)返回迭代器的函數(shù),然后對返回的每個(gè)元素都生成一個(gè)對應(yīng)原鍵的鍵值對記錄

  這個(gè)方法我最開始接觸時(shí)候,總是感覺很詫異,不是太理解,現(xiàn)在回想起來主要原因是我接觸的第一個(gè)flatMapValues的例子是這樣的,代碼如下:

1

2

3

4

val?rddPair:?RDD[(String, Int)]?=?sc.parallelize(List(("x01",?2), ("x02",?5), ("x03",?8), ("x04",?3), ("x01",?12), ("x03",?9)),?1)

val?rddFlatMapVals1:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> x to (6) }

/* 結(jié)果:(x01,2),(x01,3),(x01,4),(x01,5),(x01,6),(x02,5),(x02,6),(x04,3),(x04,4),(x04,5),(x04,6) */

println("====flatMapValues 1====:"?+ rddFlatMapVals1.collect().mkString(","))

  這個(gè)實(shí)例使用的是scala里Range這種數(shù)據(jù)類型,Range數(shù)據(jù)類型是一個(gè)數(shù)字的范圍,細(xì)細(xì)講它的理論也沒啥意思,我們看下面的例子吧,代碼如下:

1

2

3

4

5

6

7

8

9

10

11

12

val?list:List[Int]?=?List(1,2,3,4,5,6)

val?len:Int?=?list.size -?1

val?r:Range?=?0?to len

?

for?(ind <- r){

??print(list(ind) +?";")// 1;2;3;4;5;6;

}

?println("")

for?(ind <-?0?to len){

??print(list(ind) +?";")// 1;2;3;4;5;6;

}

println("")?

  由以上代碼我們可以看到0 to 3就是指代0,1,2,3這三個(gè)數(shù)字,所以我們可以在for循環(huán)里指代這個(gè)范圍。

  其實(shí)flatMapValues接受的外部方法的返回類型是一個(gè)Seq類型,Seq類型在scala里就是一個(gè)序列,一個(gè)有序的序列,我們可以把他理解成數(shù)組,我們來看看下面的例子:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

def?flatMapRange(par:Int):Range?=?{

??par to?6

}

?

def?flatMapList(par:Int):List[Int]?=?{

??List(par +?1000)

}

?

def?flatMapSeq(par:Int):Seq[Int]?=?{

??Seq(par +?6000)

}

??val?rddFlatMapVals2:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> flatMapRange(x) }

??/* 結(jié)果:(x01,2),(x01,3),(x01,4),(x01,5),(x01,6),(x02,5),(x02,6),(x04,3),(x04,4),(x04,5),(x04,6) */

??println("====flatMapValues 2====:"?+ rddFlatMapVals2.collect().mkString(","))

??val?rddFlatMapVals3:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> flatMapList(x) }

??/* 結(jié)果:(x01,1002),(x02,1005),(x03,1008),(x04,1003),(x01,1012),(x03,1009) */

??println("====flatMapValues 3====:"?+ rddFlatMapVals3.collect().mkString(","))

??val?rddFlatMapVals4:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> flatMapSeq(x) }

??/* 結(jié)果:(x01,6002),(x02,6005),(x03,6008),(x04,6003),(x01,6012),(x03,6009) */

??println("====flatMapValues 4====:"?+ rddFlatMapVals4.collect().mkString(","))

  談到flatMapValues這個(gè)方法,讓我不得不回憶起另外一個(gè)與之類似的方法flatMap,我們來看看這個(gè)方法的實(shí)例吧,代碼如下:

1

2

3

4

5

6

7

8

9

val?rddFlatMap1:RDD[(String,Int)]?=?rddPair.flatMap(x?=> List((x._1,x._2?+?3000)))

?// 結(jié)果:(x01,3002),(x02,3005),(x03,3008),(x04,3003),(x01,3012),(x03,3009)

?println("=====flatMap 1======:"?+ rddFlatMap1.collect().mkString(","))

?val?rddFlatMap2:RDD[Int]?=?rddPair.flatMap(x?=> List(x._2?+?8000))

?// 結(jié)果:8002,8005,8008,8003,8012,8009

?println("=====flatMap 2======:"?+ rddFlatMap2.collect().mkString(","))

?val?rddFlatMap3:RDD[String]?=?rddPair.flatMap(x?=> List(x._1?+?"@!@"?+ x._2))

?// 結(jié)果:x01@!@2,x02@!@5,x03@!@8,x04@!@3,x01@!@12,x03@!@9

println("=====flatMap 3======:"?+ rddFlatMap3.collect().mkString(","))

 由此可見flatMap方法里的參數(shù)也是一個(gè)Seq,而且他們之間可以相互替代使用,只不過flatMapValues是讓二元組里的第一個(gè)元素保持不變的情況下進(jìn)行計(jì)算的(及key值不發(fā)生變化)。不過spark不會無緣無故的定義一個(gè)flatMapValues,它其實(shí)和spark里的分區(qū)緊密相關(guān),關(guān)于spark的分區(qū)知識我會在后面文章里談?wù)劦摹?/p>

2) rightOuterJoin,leftOuterJoin,rddCogroup及右連接,左連接和分組函數(shù)

? ? 我們首先看看他們的使用吧,代碼如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

val?rdd:RDD[(String,Int)]?=?sc.makeRDD(List(("x01",2),("x02",5),("x03",9),("x03",21),("x04",76)))

val?other:RDD[(String,Int)]?=?sc.makeRDD(List(("x01",4),("x02",6),("x03",11)))

?

val?rddRight:RDD[(String,(Option[Int],Int))]?=?rdd.rightOuterJoin(other)

/* 結(jié)果:(x02,(Some(5),6)),(x03,(Some(9),11)),(x03,(Some(21),11)),(x01,(Some(2),4)) */

println("====rightOuterJoin====:"?+ rddRight.collect().mkString(","))

?

val?rddLeft:RDD[(String,(Int,Option[Int]))]?=?rdd.leftOuterJoin(other)

/* 結(jié)果: (x02,(5,Some(6))),(x04,(76,None)),(x03,(9,Some(11))),(x03,(21,Some(11))),(x01,(2,Some(4))) */

println("====leftOuterJoin====:"?+ rddLeft.collect().mkString(","))

val?rddSome?=?rddLeft.filter(x?=> x._2._2.isEmpty?==?false)// 過濾掉None的記錄

/* 結(jié)果: (x02,(5,Some(6))),(x03,(9,Some(11))),(x03,(21,Some(11))),(x01,(2,Some(4)))*/

println("====rddSome===:"?+ rddSome.collect().mkString(","))

?

val?rddCogroup:?RDD[(String, (Iterable[Int], Iterable[Int]))]?=?rdd.cogroup(other)

/* 結(jié)果: (x02,(CompactBuffer(5),CompactBuffer(6))),(x04,(CompactBuffer(76),CompactBuffer())),(x03,(CompactBuffer(9, 21),CompactBuffer(11))),(x01,(CompactBuffer(2),CompactBuffer(4)))*/

println("===cogroup====:"?+ rddCogroup.collect().mkString(","))

  這三個(gè)方法很好理解,就和關(guān)系數(shù)據(jù)庫里的左右連接,分組一樣,不過它們的返回值在我剛學(xué)習(xí)spark時(shí)候很是疑惑了半天,這里就好好說下它們的返回值,這其實(shí)就是學(xué)習(xí)下scala的數(shù)據(jù)類型了。

  首先是Some數(shù)據(jù)類型,Some并不是一個(gè)直接操作的數(shù)據(jù)類型,它屬于Option這個(gè)數(shù)據(jù)結(jié)構(gòu)的,其實(shí)None也是Option里的數(shù)據(jù)結(jié)構(gòu),Some里面只能放一個(gè)元素,例如Some(1),Some((1,2)),為什么scala里還要這么繁瑣的定義一個(gè)Option,并在其中還定義一個(gè)Some和一個(gè)None的結(jié)構(gòu)呢?我們首先看看下面代碼:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

def?optionSome():Unit?=?{

????/*

?????* =======option for 1=========

??????0:3

??????2:8

??????3:11

??????=======option for 1=========

??????=======option for 2=========

??????0:3

??????1:None

??????2:8

??????3:11

??????=======option for 2=========

?????*/

????val?list:List[Option[Int]]?=?List(Some(3),None,Some(8),Some(11))

????println("=======option for 1=========")

????for?(i <-?0?until list.size){

??????if?(!list(i).isEmpty){

????????println(i +?":"?+ list(i).get)

??????}

????}

????println("=======option for 1=========")

????println("=======option for 2=========")

????for?(j <-?0?until list.size){

??????val?res?=?list(j)?match?{???????

????????case?None?=> println(j +?":None")

????????case?_?=> println(j +?":"?+ list(j).get)

??????}

????}

????println("=======option for 2=========")

??}

?

  Option數(shù)據(jù)結(jié)構(gòu)其實(shí)想要表達(dá)的是一個(gè)數(shù)據(jù)集合,這個(gè)數(shù)據(jù)集合里要么有值,要么沒值,這點(diǎn)在左右連接查詢里就非常有用,其實(shí)左右連接最后的結(jié)果就是要么關(guān)聯(lián)上了要么沒有關(guān)聯(lián)上。

  分組cogroup返回的結(jié)構(gòu)是CompactBuffer,CompactBuffer并不是scala里定義的數(shù)據(jù)結(jié)構(gòu),而是spark里的數(shù)據(jù)結(jié)構(gòu),它繼承自一個(gè)迭代器和序列,所以它的返回值是一個(gè)很容易進(jìn)行循環(huán)遍歷的集合,這點(diǎn)很符合cogroup的返回值類型。

好了,這篇內(nèi)容就寫完了,下一篇文章我將要簡單聊聊spark分區(qū),后面應(yīng)該暫時(shí)會停停spark的學(xué)習(xí),要搞搞前端的一些技術(shù),這都是因?yàn)楣ぷ餍枰恕?/p>

  最后我將完整示例代碼給大家分享下,代碼如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

package?cn.com.sparktest

?

import?org.apache.spark.SparkConf

import?org.apache.spark.SparkContext

import?org.apache.spark.SparkContext._

import?org.apache.spark.rdd.RDD

import?scala.collection.immutable.List

import?org.apache.spark.util.collection.CompactBuffer

?

object?ScalaTest {

??val?conf:?SparkConf?=?new?SparkConf().setAppName("spark scala").setMaster("local[2]")

??val?sc:?SparkContext?=?new?SparkContext(conf)

?

??def?aggrFtnOne(par:?((Int, Int), Int)):?(Int, Int)?=?{

????/*

???????*aggregate的初始值為(0,0):

????????====aggrFtnOne Param===:((0,0),1)

????????????????====aggrFtnOne Param===:((1,1),2)

????????????????====aggrFtnOne Param===:((3,2),3)

????????????????====aggrFtnOne Param===:((6,3),4)

????????????????====aggrFtnOne Param===:((10,4),5)*/

????/*

???????*aggregate的初始值為(1,1):

????????====aggrFtnOne Param===:((1,1),1)

????????====aggrFtnOne Param===:((2,2),2)

????????====aggrFtnOne Param===:((4,3),3)

????????====aggrFtnOne Param===:((7,4),4)

????????====aggrFtnOne Param===:((11,5),5)

???????* */

????println("====aggrFtnOne Param===:"?+ par.toString())

????val?ret:?(Int, Int)?=?(par._1._1?+ par._2, par._1._2?+?1)

????ret

??}

?

??def?aggrFtnTwo(par:?((Int, Int), (Int, Int))):?(Int, Int)?=?{

????/*aggregate的初始值為(0,0):::::((0,0),(15,5))*/

????/*aggregate的初始值為(1,1):::::((1,1),(16,6))*/

????println("====aggrFtnTwo Param===:"?+ par.toString())

????val?ret:?(Int, Int)?=?(par._1._1?+ par._2._1, par._1._2?+ par._2._2)

????ret

??}

?

??def?foldFtn(par:?(Int, Int)):?Int?=?{

????/*fold初始值為0:

????????=====foldFtn Param====:(0,1)

????????=====foldFtn Param====:(1,2)

????????=====foldFtn Param====:(3,3)

????????=====foldFtn Param====:(6,4)

????????=====foldFtn Param====:(10,5)

????????=====foldFtn Param====:(0,15)

???????* */

????/*

???????* fold初始值為1:

????????=====foldFtn Param====:(1,1)

????????=====foldFtn Param====:(2,2)

????????=====foldFtn Param====:(4,3)

????????=====foldFtn Param====:(7,4)

????????=====foldFtn Param====:(11,5)

????????=====foldFtn Param====:(1,16)

???????* */

????println("=====foldFtn Param====:"?+ par.toString())

????val?ret:?Int?=?par._1?+ par._2

????ret

??}

???

??def?reduceFtn(par:(Int,Int)):Int?=?{

????/*

?????* ======reduceFtn Param=====:1:2

?????????????======reduceFtn Param=====:3:3

???????======reduceFtn Param=====:6:4

???????======reduceFtn Param=====:10:5

?????*/

????println("======reduceFtn Param=====:"?+ par._1?+?":"?+ par._2)

????par._1?+ par._2

??}

?

??def?sparkRDDHandle():?Unit?=?{

????val?rddInt:?RDD[Int]?=?sc.parallelize(List(1,?2,?3,?4,?5),?1)

?

????val?rddAggr1:?(Int, Int)?=?rddInt.aggregate((0,?0))((x, y)?=> (x._1?+ y, x._2?+?1), (x, y)?=> (x._1?+ y._1, x._2?+ y._2))

????println("====aggregate 1====:"?+ rddAggr1.toString())?// (15,5)

?

????val?rddAggr2:?(Int, Int)?=?rddInt.aggregate((0,?0))((x, y)?=> aggrFtnOne(x, y), (x, y)?=> aggrFtnTwo(x, y))?// 參數(shù)可以省略元組的括號

????println("====aggregate 2====:"?+ rddAggr2.toString())?// (15,5)

?

????val?rddAggr3:?(Int, Int)?=?rddInt.aggregate((1,?1))((x, y)?=> aggrFtnOne((x, y)), (x, y)?=> aggrFtnTwo((x, y)))?// 參數(shù)使用元組的括號

????println("====aggregate 3====:"?+ rddAggr3.toString())?// (17,7)

?????

????val?rddAggr4:?(Int, Int)?=?rddInt.aggregate((1,?0))((x, y)?=> (x._1?* y, x._2?+?1), (x, y)?=> (x._1?* y._1, x._2?+ y._2))

????println("====aggregate 4====:"?+ rddAggr4.toString())?// (120,5)??

?

????val?rddFold1:?Int?=?rddInt.fold(0)((x, y)?=> x + y)

????println("====fold 1====:"?+ rddFold1)?// 15

?

????val?rddFold2:?Int?=?rddInt.fold(0)((x, y)?=> foldFtn(x, y))?// 參數(shù)可以省略元組的括號

????println("====fold 2=====:"?+ rddFold2)?// 15

?

????val?rddFold3:?Int?=?rddInt.fold(1)((x, y)?=> foldFtn((x, y)))?// 參數(shù)使用元組的括號

????println("====fold 3====:"?+ rddFold3)?// 17

?????

????val?rddReduce1:Int?=?rddInt.reduce((x,y)?=> x + y)

????println("====rddReduce 1====:"?+ rddReduce1)// 15

?????

????val?rddReduce2:Int?=?rddInt.reduce((x,y)?=> reduceFtn(x,y))

????println("====rddReduce 2====:"?+ rddReduce2)// 15

?????

??}

???

??def?combineFtnOne(par:Int):(Int,Int)?=?{

????/*

?????* ====combineFtnOne Param====:2

???????====combineFtnOne Param====:5

???????====combineFtnOne Param====:8

???????====combineFtnOne Param====:3

?????*/

????println("====combineFtnOne Param====:"?+ par)

????val?ret:(Int,Int)?=?(par,1)

????ret

??}

???

??def?combineFtnTwo(par:((Int,Int),Int)):(Int,Int)?=?{

????/*

??????====combineFtnTwo Param====:((2,1),12)

??????====combineFtnTwo Param====:((8,1),9)

?????* */

????println("====combineFtnTwo Param====:"?+ par.toString())

????val?ret:(Int,Int)?=?(par._1._1?+ par._2,par._1._2?+?1)

????ret

??}

???

??def?combineFtnThree(par:((Int,Int),(Int,Int))):(Int,Int)?=?{

????/*

?????* 無結(jié)果打印

?????*/

????println("@@@@@@@@@@@@@@@@@@")

????println("====combineFtnThree Param===:"?+ par.toString())

????val?ret:(Int,Int)?=?(par._1._1?+ par._2._1,par._1._2?+ par._2._2)

????ret

??}

???

??def?flatMapRange(par:Int):Range?=?{

????par to?6

??}

???

??def?flatMapList(par:Int):List[Int]?=?{

????List(par +?1000)

??}

???

??def?flatMapSeq(par:Int):Seq[Int]?=?{

????Seq(par +?6000)

??}

?

??def?sparkPairRDD():?Unit?=?{

????val?rddPair:?RDD[(String, Int)]?=?sc.parallelize(List(("x01",?2), ("x02",?5), ("x03",?8), ("x04",?3), ("x01",?12), ("x03",?9)),?1)

?????

????/* def combineByKey[C](createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C): RDD[(String, C)] */???

????val?rddCombine1:RDD[(String,(Int,Int))]?=?rddPair.combineByKey(x?=> (x,?1), (com:?(Int, Int), x)?=> (com._1?+ x, com._2?+?1), (com1:?(Int, Int), com2:?(Int, Int))?=> (com1._1?+ com2._1, com1._2?+ com2._2))

????println("====combineByKey 1====:"?+ rddCombine1.collect().mkString(","))?// (x02,(5,1)),(x03,(17,2)),(x01,(14,2)),(x04,(3,1))

?????

????val?rddCombine2:RDD[(String,(Int,Int))]?=?rddPair.combineByKey(x?=> combineFtnOne(x), (com:?(Int, Int), x)?=> combineFtnTwo(com,x), (com1:?(Int, Int), com2:?(Int, Int))?=> combineFtnThree(com1,com2))

????println("=====combineByKey 2====:"?+ rddCombine2.collect().mkString(","))?// (x02,(5,1)),(x03,(17,2)),(x01,(14,2)),(x04,(3,1))

?????

?????

????val?rddKeys:RDD[String]?=?rddPair.keys

????/*結(jié)果:x01,x02,x03,x04,x01,x03? 注意調(diào)用keys方法時(shí)候不能加上括號,否則會報(bào)錯(cuò)*/

????println("====keys====:"?+ rddKeys.collect().mkString(","))

?????

????val?rddVals:RDD[Int]?=?rddPair.values

????/*結(jié)果:2,5,8,3,12,9? 注意調(diào)用values方法時(shí)候不能加上括號,否則會報(bào)錯(cuò)*/

????println("=====values=====:"?+ rddVals.collect().mkString(","))

?????

????val?rddFlatMapVals1:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> x to (6) }

????/* 結(jié)果:(x01,2),(x01,3),(x01,4),(x01,5),(x01,6),(x02,5),(x02,6),(x04,3),(x04,4),(x04,5),(x04,6) */

????println("====flatMapValues 1====:"?+ rddFlatMapVals1.collect().mkString(","))

????val?rddFlatMapVals2:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> flatMapRange(x) }

????/* 結(jié)果:(x01,2),(x01,3),(x01,4),(x01,5),(x01,6),(x02,5),(x02,6),(x04,3),(x04,4),(x04,5),(x04,6) */

????println("====flatMapValues 2====:"?+ rddFlatMapVals2.collect().mkString(","))

????val?rddFlatMapVals3:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> flatMapList(x) }

????/* 結(jié)果:(x01,1002),(x02,1005),(x03,1008),(x04,1003),(x01,1012),(x03,1009) */

????println("====flatMapValues 3====:"?+ rddFlatMapVals3.collect().mkString(","))

????val?rddFlatMapVals4:RDD[(String,Int)]?=?rddPair.flatMapValues { x?=> flatMapSeq(x) }

????/* 結(jié)果:(x01,6002),(x02,6005),(x03,6008),(x04,6003),(x01,6012),(x03,6009) */

????println("====flatMapValues 4====:"?+ rddFlatMapVals4.collect().mkString(","))

?????

????val?rddFlatMap1:RDD[(String,Int)]?=?rddPair.flatMap(x?=> List((x._1,x._2?+?3000)))

????// 結(jié)果:(x01,3002),(x02,3005),(x03,3008),(x04,3003),(x01,3012),(x03,3009)

????println("=====flatMap 1======:"?+ rddFlatMap1.collect().mkString(","))

????val?rddFlatMap2:RDD[Int]?=?rddPair.flatMap(x?=> List(x._2?+?8000))

????// 結(jié)果:8002,8005,8008,8003,8012,8009

????println("=====flatMap 2======:"?+ rddFlatMap2.collect().mkString(","))

????val?rddFlatMap3:RDD[String]?=?rddPair.flatMap(x?=> List(x._1?+?"@!@"?+ x._2))

????// 結(jié)果:x01@!@2,x02@!@5,x03@!@8,x04@!@3,x01@!@12,x03@!@9

????println("=====flatMap 3======:"?+ rddFlatMap3.collect().mkString(","))

??}

???

??def?optionSome():Unit?=?{

????/*

?????* =======option for 1=========

??????0:3

??????2:8

??????3:11

??????=======option for 1=========

??????=======option for 2=========

??????0:3

??????1:None

??????2:8

??????3:11

??????=======option for 2=========

?????*/

????val?list:List[Option[Int]]?=?List(Some(3),None,Some(8),Some(11))

????println("=======option for 1=========")

????for?(i <-?0?until list.size){

??????if?(!list(i).isEmpty){

????????println(i +?":"?+ list(i).get)

??????}

????}

????println("=======option for 1=========")

????println("=======option for 2=========")

????for?(j <-?0?until list.size){

??????val?res?=?list(j)?match?{???????

????????case?None?=> println(j +?":None")

????????case?_?=> println(j +?":"?+ list(j).get)

??????}

????}

????println("=======option for 2=========")

??}

???

??def?pairRDDJoinGroup():Unit?=?{

????val?rdd:RDD[(String,Int)]?=?sc.makeRDD(List(("x01",2),("x02",5),("x03",9),("x03",21),("x04",76)))

????val?other:RDD[(String,Int)]?=?sc.makeRDD(List(("x01",4),("x02",6),("x03",11)))

?????

????val?rddRight:RDD[(String,(Option[Int],Int))]?=?rdd.rightOuterJoin(other)

????/* 結(jié)果:(x02,(Some(5),6)),(x03,(Some(9),11)),(x03,(Some(21),11)),(x01,(Some(2),4)) */

????println("====rightOuterJoin====:"?+ rddRight.collect().mkString(","))

?????

????val?rddLeft:RDD[(String,(Int,Option[Int]))]?=?rdd.leftOuterJoin(other)

????/* 結(jié)果: (x02,(5,Some(6))),(x04,(76,None)),(x03,(9,Some(11))),(x03,(21,Some(11))),(x01,(2,Some(4))) */

????println("====leftOuterJoin====:"?+ rddLeft.collect().mkString(","))

????val?rddSome?=?rddLeft.filter(x?=> x._2._2.isEmpty?==?false)// 過濾掉None的記錄

????/* 結(jié)果: (x02,(5,Some(6))),(x03,(9,Some(11))),(x03,(21,Some(11))),(x01,(2,Some(4)))*/

????println("====rddSome===:"?+ rddSome.collect().mkString(","))

?????

????val?rddCogroup:?RDD[(String, (Iterable[Int], Iterable[Int]))]?=?rdd.cogroup(other)

????/* 結(jié)果: (x02,(CompactBuffer(5),CompactBuffer(6))),(x04,(CompactBuffer(76),CompactBuffer())),(x03,(CompactBuffer(9, 21),CompactBuffer(11))),(x01,(CompactBuffer(2),CompactBuffer(4)))*/

????println("===cogroup====:"?+ rddCogroup.collect().mkString(","))

??}

???

??def?scalaBasic(){

????val?its:Iterable[Int]?=?Iterable(1,2,3,4,5)

????its.foreach { x?=> print(x +?",") }// 1,2,3,4,5,

?????

????val?tuple2Param1:Tuple2[String,Int]?=?Tuple2("x01",12)// 標(biāo)準(zhǔn)定義二元組

????val?tuple2Param2:(String,Int)?=?("x02",29)// 字面量定義二元組

?????

????/* 結(jié)果: x01:12*/

????println("====tuple2Param1====:"?+ tuple2Param1._1?+?":"?+ tuple2Param1._2)

????/* 結(jié)果: x02:29 */

????println("====tuple2Param2====:"?+ tuple2Param2._1?+?":"?+ tuple2Param2._2)

?????

????val?tuple6Param1:Tuple6[String,Int,Int,Int,Int,String]?=?Tuple6("xx01",1,2,3,4,"x1x")// 標(biāo)準(zhǔn)定義6元組

????val?tuple6Param2:(String,Int,Int,Int,Int,String)?=?("xx02",1,2,3,4,"x2x")// 字面量定義6元組

?????

????/* 結(jié)果: xx01:1:2:3:4:x1x */

????println("====tuple6Param1====:"?+ tuple6Param1._1?+?":"?+ tuple6Param1._2?+?":"?+ tuple6Param1._3?+?":"?+ tuple6Param1._4?+?":"?+ tuple6Param1._5?+?":"?+ tuple6Param1._6)

????/* 結(jié)果: xx02:1:2:3:4:x2x */

????println("====tuple6Param2====:"?+ tuple6Param2._1?+?":"?+ tuple6Param2._2?+?":"?+ tuple6Param2._3?+?":"?+ tuple6Param2._4?+?":"?+ tuple6Param2._5?+?":"?+ tuple6Param2._6)

?????

?????val?list:List[Int]?=?List(1,2,3,4,5,6)

?????val?len:Int?=?list.size -?1

?????val?r:Range?=?0?to len

??????

?????for?(ind <- r){

???????print(list(ind) +?";")// 1;2;3;4;5;6;

?????}

??????println("")

?????for?(ind <-?0?to len){

???????print(list(ind) +?";")// 1;2;3;4;5;6;

?????}

?????println("")

??}

?

??def?main(args:?Array[String]):?Unit?=?{

????scalaBasic()

????optionSome()

?????

????sparkRDDHandle()

????sparkPairRDD()

????pairRDDJoinGroup()??

?

?????

??}

}

總結(jié)

以上是生活随笔為你收集整理的Spark笔记:复杂RDD的API的理解(下)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。