生活随笔
收集整理的這篇文章主要介紹了
在Spark中自定义Kryo序列化输入输出API(转)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
原文鏈接:在Spark中自定義Kryo序列化輸入輸出API
在Spark中內置支持兩種系列化格式:(1)、Java serialization;(2)、Kryo serialization。在默認情況下,Spark使用的是Java的ObjectOutputStream系列化框架,它支持所有繼承java.io.Serializable的類系列化,雖然Java系列化非常靈活,但是它的性能不佳。然而我們可以使用Kryo 庫來系列化,它相比Java serialization系列化高效,速度很快(通常比Java快10x),但是它不支持所有的系列化對象,而且要求用戶注冊類。
在Spark中,使用Kryo系列化比使用Java系列化更明智。在shuffling和caching大量數據的情況下,使用 Kryo系列化就變得非常重要。
雖然Kryo支持對RDD的cache和shuffle,但是在Spark中不是內置就顯示提供使用Kryo將數據系列化到磁盤中的輸入輸出API,RDD中的saveAsObjectFile和SparkContext中的objectFile方法僅僅支持使用Java系列化。所以如果我們可以使用Kryo系列化將會變得很棒!
在這篇文章中,我將討論如何自定義Kryo系列化輸出輸出相關API來將數據進行讀寫到磁盤中。
寫數據
通常,我們使用rdd.saveAsObjectFile?API將已經系列化的對象寫入到磁盤中。下面的代碼將展示如何使用我們自定義的saveAsObjectFile方法將已經使用kryo系列化的對象寫入到磁盤中:
| 1 | def?saveAsObjectFile[T:?ClassTag](rdd:?RDD[T], path:?String) |
這個函數中參數rdd就是我們需要寫的數據;path是數據保存的路徑。
| 1 | val?kryoSerializer?=?new?KryoSerializer(rdd.context.getConf) |
KryoSerializer是Spark內部提供的用于提供操作Kryo的類。在上述代碼中,我們創建了KryoSerializer對象,并從rdd.context.getConf中獲取傳進來的緩存大小。
| 1 | rdd.mapPartitions(iter?=> iter.grouped(10) |
| 3 | ??????.map(splitArray?=> {} |
所有的objectFile 將會在HDFS上保存,我們對RDD中的每個分片進行遍歷,然后將他們轉換成Byte數組。
| 1 | val?kryo?=?kryoSerializer.newKryo() |
對每個splitArray,我們首先創建了kryo實例,kryo是線程不安全的,所以我們在每個map操作中單獨創建。當我們調用?kryoSerializer.newKryo()來創建新的kryo實例,他也會調用我們自定義的KryoRegistrator。
| 1 | //create output stream and plug it to the kryo output |
| 2 | val?bao?=?new?ByteArrayOutputStream() |
| 3 | val?output?=?kryoSerializer.newKryoOutput() |
| 4 | output.setOutputStream(bao) |
| 5 | kryo.writeClassAndObject(output, splitArray) |
一旦我們擁有kryo實例,我們就可以創建kryo輸出對象,然后我們將類信息和對象寫入到那個輸出對象中。
| 1 | val?byteWritable?=?new?BytesWritable(bao.toByteArray) |
| 2 | ??????(NullWritable.get(), byteWritable) |
| 3 | ????}).saveAsSequenceFile(path) |
我們在創建byteWritable的時候,包裝了bytearray,然后保存成Sequence文件。使用那些代碼我們就可以將Kryo對象寫入到磁盤中。完整代碼如下:
| 05 | ?* bolg:?http://www.iteblog.com |
| 06 | ?* 本文地址:http://www.iteblog.com/archives/1328 |
| 07 | ?* 過往記憶博客,專注于hadoop、hive、spark、shark、flume的技術博客,大量的干貨 |
| 08 | ?* 過往記憶博客微信公共帳號:iteblog_hadoop |
| 11 | ??def?saveAsObjectFile[T:?ClassTag](rdd:?RDD[T], path:?String) { |
| 12 | ????val?kryoSerializer?=?new?KryoSerializer(rdd.context.getConf) |
| 14 | ????rdd.mapPartitions(iter?=> iter.grouped(10) |
| 16 | ??????.map(splitArray?=> { |
| 17 | ??????//initializes kyro and calls your registrator class |
| 18 | ??????val?kryo?=?kryoSerializer.newKryo() |
| 20 | ??????//convert data to bytes |
| 21 | ??????val?bao?=?new?ByteArrayOutputStream() |
| 22 | ??????val?output?=?kryoSerializer.newKryoOutput() |
| 23 | ??????output.setOutputStream(bao) |
| 24 | ??????kryo.writeClassAndObject(output, splitArray) |
| 27 | ??????// We are ignoring key field of sequence file |
| 28 | ??????val?byteWritable?=?new?BytesWritable(bao.toByteArray) |
| 29 | ??????(NullWritable.get(), byteWritable) |
| 30 | ????}).saveAsSequenceFile(path) |
讀數據
光有寫沒有讀對我們來說仍然不完美。通常我們使用sparkContext中的objectFile API從磁盤中讀取數據,這里我們使用自定義的objectFile API來讀取Kryo對象文件。
| 01 | def?objectFile[T](sc:?SparkContext, path:?String, minPartitions:?Int?=?1) |
| 02 | ????(implicit?ct:?ClassTag[T])?=?{ |
| 03 | ????val?kryoSerializer?=?new?KryoSerializer(sc.getConf) |
| 04 | ????sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], |
| 07 | ???????val?kryo?=?kryoSerializer.newKryo() |
| 08 | ???????val?input?=?new?Input() |
| 09 | ???????input.setBuffer(x._2.getBytes) |
| 10 | ???????val?data?=?kryo.readClassAndObject(input) |
| 11 | ???????val?dataObject?=?data.asInstanceOf[Array[T]] |
上面的步驟和寫的步驟很類似,只不過這里我們使用的是input,而不是output。我們從BytesWritable中讀取bytes數據,然后使用readClassAndObject API反序列化數據。
如何使用
下面例子使用上面介紹的兩個方法來系列化和反序列化Person對象:
查看源代碼 打印幫助
| 05 | ?* bolg:?http://www.iteblog.com |
| 06 | ?* 本文地址:http://www.iteblog.com/archives/1328 |
| 07 | ?* 過往記憶博客,專注于hadoop、hive、spark、shark、flume的技術博客,大量的干貨 |
| 08 | ?* 過往記憶博客微信公共帳號:iteblog_hadoop |
| 11 | // user defined class that need to serialized |
| 12 | ??class?Person(val?name:?String) |
| 14 | ?def?main(args:?Array[String]) { |
| 16 | ????if?(args.length <?1) { |
| 17 | ??????println("Please provide output path") |
| 20 | ????val?outputPath?=?args(0) |
| 22 | ????val?conf?=?new?SparkConf().setMaster("local").setAppName("kryoexample") |
| 23 | ????conf.set("spark.serializer",?"org.apache.spark.serializer.KryoSerializer") |
| 24 | ????val?sc?=?new?SparkContext(conf) |
| 26 | ????//create some dummy data |
| 27 | ????val?personList?=?1?to?10000?map (value?=>?new?Person(value +?"")) |
| 28 | ????val?personRDD?=?sc.makeRDD(personList) |
| 30 | ????saveAsObjectFile(personRDD, outputPath) |
| 31 | ????val?rdd?=?objectFile[Person](sc, outputPath) |
| 32 | ????println(rdd.map(person?=> person.name).collect().toList) |
轉載于:https://www.cnblogs.com/gaopeng527/p/4962030.html
總結
以上是生活随笔為你收集整理的在Spark中自定义Kryo序列化输入输出API(转)的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。