日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

在Spark中自定义Kryo序列化输入输出API(转)

發布時間:2024/4/17 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 在Spark中自定义Kryo序列化输入输出API(转) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

原文鏈接:在Spark中自定義Kryo序列化輸入輸出API

Spark中內置支持兩種系列化格式:(1)、Java serialization;(2)、Kryo serialization。在默認情況下,Spark使用的是Java的ObjectOutputStream系列化框架,它支持所有繼承java.io.Serializable的類系列化,雖然Java系列化非常靈活,但是它的性能不佳。然而我們可以使用Kryo 庫來系列化,它相比Java serialization系列化高效,速度很快(通常比Java快10x),但是它不支持所有的系列化對象,而且要求用戶注冊類。

  在Spark中,使用Kryo系列化比使用Java系列化更明智。在shuffling和caching大量數據的情況下,使用 Kryo系列化就變得非常重要。

  雖然Kryo支持對RDD的cache和shuffle,但是在Spark中不是內置就顯示提供使用Kryo將數據系列化到磁盤中的輸入輸出API,RDD中的saveAsObjectFile和SparkContext中的objectFile方法僅僅支持使用Java系列化。所以如果我們可以使用Kryo系列化將會變得很棒!

  在這篇文章中,我將討論如何自定義Kryo系列化輸出輸出相關API來將數據進行讀寫到磁盤中。

寫數據

  通常,我們使用rdd.saveAsObjectFile?API將已經系列化的對象寫入到磁盤中。下面的代碼將展示如何使用我們自定義的saveAsObjectFile方法將已經使用kryo系列化的對象寫入到磁盤中:

1def?saveAsObjectFile[T:?ClassTag](rdd:?RDD[T], path:?String)

這個函數中參數rdd就是我們需要寫的數據;path是數據保存的路徑。

1val?kryoSerializer?=?new?KryoSerializer(rdd.context.getConf)

  KryoSerializer是Spark內部提供的用于提供操作Kryo的類。在上述代碼中,我們創建了KryoSerializer對象,并從rdd.context.getConf中獲取傳進來的緩存大小。

1rdd.mapPartitions(iter?=> iter.grouped(10)
2??????.map(_.toArray))
3??????.map(splitArray?=> {}

所有的objectFile 將會在HDFS上保存,我們對RDD中的每個分片進行遍歷,然后將他們轉換成Byte數組。

1val?kryo?=?kryoSerializer.newKryo()

  對每個splitArray,我們首先創建了kryo實例,kryo是線程不安全的,所以我們在每個map操作中單獨創建。當我們調用?kryoSerializer.newKryo()來創建新的kryo實例,他也會調用我們自定義的KryoRegistrator。

1//create output stream and plug it to the kryo output
2val?bao?=?new?ByteArrayOutputStream()
3val?output?=?kryoSerializer.newKryoOutput()
4output.setOutputStream(bao)
5kryo.writeClassAndObject(output, splitArray)
6output.close()

一旦我們擁有kryo實例,我們就可以創建kryo輸出對象,然后我們將類信息和對象寫入到那個輸出對象中。

1val?byteWritable?=?new?BytesWritable(bao.toByteArray)
2??????(NullWritable.get(), byteWritable)
3????}).saveAsSequenceFile(path)

  我們在創建byteWritable的時候,包裝了bytearray,然后保存成Sequence文件。使用那些代碼我們就可以將Kryo對象寫入到磁盤中。完整代碼如下:

01/**
02?* User: 過往記憶
03?* Date: 15-04-24
04?* Time: 上午07:24
05?* bolg:?http://www.iteblog.com
06?* 本文地址:http://www.iteblog.com/archives/1328
07?* 過往記憶博客,專注于hadoop、hive、spark、shark、flume的技術博客,大量的干貨
08?* 過往記憶博客微信公共帳號:iteblog_hadoop
09?*/
10?
11??def?saveAsObjectFile[T:?ClassTag](rdd:?RDD[T], path:?String) {
12????val?kryoSerializer?=?new?KryoSerializer(rdd.context.getConf)
13?
14????rdd.mapPartitions(iter?=> iter.grouped(10)
15??????.map(_.toArray))
16??????.map(splitArray?=> {
17??????//initializes kyro and calls your registrator class
18??????val?kryo?=?kryoSerializer.newKryo()
19?
20??????//convert data to bytes
21??????val?bao?=?new?ByteArrayOutputStream()
22??????val?output?=?kryoSerializer.newKryoOutput()
23??????output.setOutputStream(bao)
24??????kryo.writeClassAndObject(output, splitArray)
25??????output.close()
26?
27??????// We are ignoring key field of sequence file
28??????val?byteWritable?=?new?BytesWritable(bao.toByteArray)
29??????(NullWritable.get(), byteWritable)
30????}).saveAsSequenceFile(path)
31??}

讀數據

  光有寫沒有讀對我們來說仍然不完美。通常我們使用sparkContext中的objectFile API從磁盤中讀取數據,這里我們使用自定義的objectFile API來讀取Kryo對象文件。

01def?objectFile[T](sc:?SparkContext, path:?String, minPartitions:?Int?=?1)
02????(implicit?ct:?ClassTag[T])?=?{
03????val?kryoSerializer?=?new?KryoSerializer(sc.getConf)
04????sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable],
05???????minPartitions)
06???????.flatMap(x?=> {
07???????val?kryo?=?kryoSerializer.newKryo()
08???????val?input?=?new?Input()
09???????input.setBuffer(x._2.getBytes)
10???????val?data?=?kryo.readClassAndObject(input)
11???????val?dataObject?=?data.asInstanceOf[Array[T]]
12???????dataObject
13????})
14??}

上面的步驟和寫的步驟很類似,只不過這里我們使用的是input,而不是output。我們從BytesWritable中讀取bytes數據,然后使用readClassAndObject API反序列化數據。

如何使用

  下面例子使用上面介紹的兩個方法來系列化和反序列化Person對象:

查看源代碼 打印幫助
01/**
02?* User: 過往記憶
03?* Date: 15-04-24
04?* Time: 上午07:24
05?* bolg:?http://www.iteblog.com
06?* 本文地址:http://www.iteblog.com/archives/1328
07?* 過往記憶博客,專注于hadoop、hive、spark、shark、flume的技術博客,大量的干貨
08?* 過往記憶博客微信公共帳號:iteblog_hadoop
09?*/
10?
11// user defined class that need to serialized
12??class?Person(val?name:?String)
13?
14?def?main(args:?Array[String]) {
15?
16????if?(args.length <?1) {
17??????println("Please provide output path")
18??????return
19????}
20????val?outputPath?=?args(0)
21?
22????val?conf?=?new?SparkConf().setMaster("local").setAppName("kryoexample")
23????conf.set("spark.serializer",?"org.apache.spark.serializer.KryoSerializer")
24????val?sc?=?new?SparkContext(conf)
25?
26????//create some dummy data
27????val?personList?=?1?to?10000?map (value?=>?new?Person(value +?""))
28????val?personRDD?=?sc.makeRDD(personList)
29?
30????saveAsObjectFile(personRDD, outputPath)
31????val?rdd?=?objectFile[Person](sc, outputPath)
32????println(rdd.map(person?=> person.name).collect().toList)
33??}

轉載于:https://www.cnblogs.com/gaopeng527/p/4962030.html

總結

以上是生活随笔為你收集整理的在Spark中自定义Kryo序列化输入输出API(转)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。