Spark Java API:Transformation
mapPartitions
官方文檔描述:
Return a new RDD by applying a function to each partition of this RDD.mapPartitions函數(shù)會對每個(gè)分區(qū)依次調(diào)用分區(qū)函數(shù)處理,然后將處理的結(jié)果(若干個(gè)Iterator)生成新的RDDs。?
mapPartitions與map類似,但是如果在映射的過程中需要頻繁創(chuàng)建額外的對象,使用mapPartitions要比map高效的過。比如,將RDD中的所有數(shù)據(jù)通過JDBC連接寫入數(shù)據(jù)庫,如果使用map函數(shù),可能要為每一個(gè)元素都創(chuàng)建一個(gè)connection,這樣開銷很大,如果使用mapPartitions,那么只需要針對每一個(gè)分區(qū)建立一個(gè)connection。
函數(shù)原型:
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],preservesPartitioning: Boolean): JavaRDD[U]第一個(gè)函數(shù)是基于第二個(gè)函數(shù)實(shí)現(xiàn)的,使用的是preservesPartitioning為false。而第二個(gè)函數(shù)我們可以指定preservesPartitioning,preservesPartitioning表示是否保留父RDD的partitioner分區(qū)信息;FlatMapFunction中的Iterator是這個(gè)rdd的一個(gè)分區(qū)的所有element組成的Iterator。
實(shí)例
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); //RDD有兩個(gè)分區(qū) JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,2); //計(jì)算每個(gè)分區(qū)的合計(jì) JavaRDD<Integer> mapPartitionsRDD = javaRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() { @Overridepublic Iterable<Integer> call(Iterator<Integer> integerIterator) throws Exception {int isum = 0;while(integerIterator.hasNext())isum += integerIterator.next();LinkedList<Integer> linkedList = new LinkedList<Integer>();linkedList.add(isum);return linkedList; } });System.out.println("mapPartitionsRDD~~~~~~~~~~~~~~~~~~~~~~" + mapPartitionsRDD.collect());mapPartitionsWithIndex
官方文檔說明:
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.mapPartitionsWithIndex與mapPartition基本相同,只是在處理函數(shù)的參數(shù)是一個(gè)二元元組,元組的第一個(gè)元素是當(dāng)前處理的分區(qū)的index,元組的第二個(gè)元素是當(dāng)前處理的分區(qū)元素組成的Iterator。
函數(shù)原型:
def mapPartitionsWithIndex[R]( f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],preservesPartitioning: Boolean = false): JavaRDD[R]源碼分析:
def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning)} def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter), preservesPartitioning)}從源碼中可以看到其實(shí)mapPartition已經(jīng)獲得了當(dāng)前處理的分區(qū)的index,只是沒有傳入分區(qū)處理函數(shù),而mapPartition將其傳入分區(qū)處理函數(shù)。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); //RDD有兩個(gè)分區(qū) JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,2); //分區(qū)index、元素值、元素編號輸出 JavaRDD<String> mapPartitionsWithIndexRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {@Override public Iterator<String> call(Integer v1, Iterator<Integer> v2) throws Exception { LinkedList<String> linkedList = new LinkedList<String>(); int i = 0; while (v2.hasNext()) linkedList.add(Integer.toString(v1) + "|" + v2.next().toString() + Integer.toString(i++)); return linkedList.iterator(); } },false);System.out.println("mapPartitionsWithIndexRDD~~~~~~~~~~~~~~~~~~~~~~" + mapPartitionsWithIndexRDD.collect(?
?
sample
官方文檔描述:
Return a sampled subset of this RDD.返回抽樣的樣本的子集。
函數(shù)原型:
- withReplacement can elements be sampled multiple times (replaced when sampled out)
- fraction expected size of the sample as a fraction of this RDD’s size
- without replacement: probability that each element is chosen; fraction must be [0, 1]
- with replacement: expected number of times each element is chosen; fraction must be >= 0
- withReplacement can elements be sampled multiple times (replaced when sampled out)
- fraction expected size of the sample as a fraction of this RDD’s size
- without replacement: probability that each element is chosen; fraction must be [0, 1]
- with replacement: expected number of times each element is chosen; fraction must be >= 0
- seed seed for the random number generator
第一函數(shù)是基于第二個(gè)實(shí)現(xiàn)的,在第一個(gè)函數(shù)中seed為Utils.random.nextLong;其中,withReplacement是建立不同的采樣器;fraction為采樣比例;seed為隨機(jī)生成器的種子。
源碼分析:
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] = withScope { require(fraction >= 0.0, "Negative fraction value: " + fraction) if (withReplacement) { new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed) } else { new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed) } }sample函數(shù)中,首先對fraction進(jìn)行驗(yàn)證;再次建立PartitionwiseSampledRDD,依據(jù)withReplacement的值分別建立柏松采樣器或伯努利采樣器。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); //false 是伯努利分布 (元素可以多次采樣);0.2 采樣比例;100 隨機(jī)數(shù)生成器的種子 JavaRDD<Integer> sampleRDD = javaRDD.sample(false,0.2,100); System.out.println("sampleRDD~~~~~~~~~~~~~~~~~~~~~~~~~~" + sampleRDD.collect()); //true 是柏松分布;0.2 采樣比例;100 隨機(jī)數(shù)生成器的種子 JavaRDD<Integer> sampleRDD1 = javaRDD.sample(false,0.2,100); System.out.println("sampleRDD1~~~~~~~~~~~~~~~~~~~~~~~~~~" + sampleRDD1.collect());randomSplit
官方文檔描述:
Randomly splits this RDD with the provided weights.依據(jù)所提供的權(quán)重對該RDD進(jìn)行隨機(jī)劃分
函數(shù)原型:
- weights for splits, will be normalized if they don’t sum to 1
- random seed
- return split RDDs in an array
- weights for splits, will be normalized if they don’t sum to 1
- return split RDDs in an array
源碼分析:
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope { val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x =>randomSampleWithRange(x(0), x(1), seed) }.toArray }def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = { this.mapPartitionsWithIndex( { (index, partition) => val sampler = new BernoulliCellSampler[T](lb, ub) sampler.setSeed(seed + index) sampler.sample(partition) }, preservesPartitioning = true) }從源碼中可以看到randomSPlit先是對權(quán)重?cái)?shù)組進(jìn)行0-1正則化;再利用randomSampleWithRange函數(shù),對RDD進(jìn)行劃分;而在該函數(shù)中調(diào)用mapPartitionsWithIndex(上一節(jié)有具體說明),建立伯努利采樣器對RDD進(jìn)行劃分。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); double [] weights = {0.1,0.2,0.7}; //依據(jù)所提供的權(quán)重對該RDD進(jìn)行隨機(jī)劃分 JavaRDD<Integer> [] randomSplitRDDs = javaRDD.randomSplit(weights); System.out.println("randomSplitRDDs of size~~~~~~~~~~~~~~" + randomSplitRDDs.length); int i = 0; for(JavaRDD<Integer> item:randomSplitRDDs) System.out.println(i++ + " randomSplitRDDs of item~~~~~~~~~~~~~~~~" + item.collect());?
?
union
官方文檔描述:
Return the union of this RDD and another one. Any identical elements will appear multiple times(use`.distinct()` to eliminate them).函數(shù)原型:
def union(other: JavaRDD[T]): JavaRDD[T]union() 將兩個(gè) RDD 簡單合并在一起,不改變 partition 里面的數(shù)據(jù)。RangeDependency 實(shí)際上也是 1:1,只是為了訪問 union() 后的 RDD 中的 partition 方便,保留了原始 RDD 的 range 邊界。
實(shí)例:
List<Integer> data = Arrays.asList(1,2,4,3,5,6,7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaRDD<Integer> unionRDD = javaRDD.union(javaRDD); System.out.println("unionRDD~~~~~~~~~~~~~~~~~~~~~~" + unionRDD.collect());intersection
官方文檔描述:
Return the intersection of this RDD and another one.The output will not contain any duplicate elements, even if the input RDDs did.Note that this method performs a shuffle internally.函數(shù)原型:
def intersection(other: JavaRDD[T]): JavaRDD[T]源碼分析:
def intersection(other: RDD[T]): RDD[T] = withScope { this.map(v => (v, null)).cogroup(other.map(v => (v, null))) .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } .keys }先使用 map() 將 RDD[T] 轉(zhuǎn)變成 RDD[(T, null)],這里的 T 只要不是 Array 等集合類型即可。接著,進(jìn)行 a.cogroup(b)(后面會詳細(xì)介紹cogroup)。之后再使用 filter() 過濾掉 [iter(groupA()), iter(groupB())] 中 groupA 或 groupB 為空的 records,得到 FilteredRDD。最后,使用 keys() 只保留 key 即可,得到 MappedRDD。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaRDD<Integer> intersectionRDD = javaRDD.intersection(javaRDD); System.out.println(intersectionRDD.collect());?
?
coalesce
官方文檔描述:
Return a new RDD that is reduced into `numPartitions` partitions.函數(shù)原型:
def coalesce(numPartitions: Int): JavaRDD[T]def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T]源碼分析:
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T] = withScope { if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD(new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions).values } else { new CoalescedRDD(this, numPartitions) } }從源碼中可以看出,當(dāng)shuffle=false時(shí),由于不進(jìn)行shuffle,問題就變成parent RDD中哪些partition可以合并在一起,合并的過程依據(jù)設(shè)置的numPartitons中的元素個(gè)數(shù)進(jìn)行合并處理。當(dāng)shuffle=true時(shí),進(jìn)行shuffle操作,原理很簡單,先是對partition中record進(jìn)行k-v轉(zhuǎn)換,其中key是由?(new Random(index)).nextInt(numPartitions)+1計(jì)算得到,value為record,index 是該 partition 的索引,numPartitions 是 CoalescedRDD 中的 partition 個(gè)數(shù),然后 shuffle 后得到 ShuffledRDD, 可以得到均分的 records,再經(jīng)過復(fù)雜算法來建立 ShuffledRDD 和 CoalescedRDD 之間的數(shù)據(jù)聯(lián)系,最后過濾掉 key,得到 coalesce 后的結(jié)果 MappedRDD。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); // shuffle默認(rèn)是false JavaRDD<Integer> coalesceRDD = javaRDD.coalesce(2); System.out.println(coalesceRDD);JavaRDD<Integer> coalesceRDD1 = javaRDD.coalesce(2,true); System.out.println(coalesceRDD1);注意:
coalesce() 可以將 parent RDD 的 partition 個(gè)數(shù)進(jìn)行調(diào)整,比如從 5 個(gè)減少到 3 個(gè),或者從 5 個(gè)增加到 10 個(gè)。需要注意的是當(dāng) shuffle = false 的時(shí)候,是不能增加 partition 個(gè)數(shù)的(即不能從 5 個(gè)變?yōu)?10 個(gè))。
repartition
官網(wǎng)文檔描述:
Return a new RDD that has exactly numPartitions partitions. Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using `coalesce`,which can avoid performing a shuffle.特別需要說明的是,如果使用repartition對RDD的partition數(shù)目進(jìn)行縮減操作,可以使用coalesce函數(shù),將shuffle設(shè)置為false,避免shuffle過程,提高效率。
函數(shù)原型:
def repartition(numPartitions: Int): JavaRDD[T]源碼分析:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }從源碼中可以看到repartition等價(jià)于 coalesce(numPartitions, shuffle = true)
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); //等價(jià)于 coalesce(numPartitions, shuffle = true) JavaRDD<Integer> repartitionRDD = javaRDD.repartition(2); System.out.println(repartitionRDD);?
?
cartesian
官方文檔描述:
Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in `this` and b is in `other`.函數(shù)原型:
def cartesian[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U]源碼分析:
def getPartitions: Array[Partition] = { // create the cross product split val array = new Array[Partition(rdd1.partitions.length * rdd2.partitions.length) for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) { val idx = s1.index * numPartitionsInRdd2 + s2.index array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index) } array }def getDependencies: Seq[Dependency[_]] = List( new NarrowDependency(rdd1) { def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2) }, new NarrowDependency(rdd2) { def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2) } )Cartesian 對兩個(gè) RDD 做笛卡爾集,生成的 CartesianRDD 中 partition 個(gè)數(shù) =?partitionNum(RDD a) * partitionNum(RDD b)。從getDependencies分析可知,這里的依賴關(guān)系與前面的不太一樣,CartesianRDD中每個(gè)partition依賴兩個(gè)parent RDD,而且其中每個(gè) partition 完全依賴(NarrowDependency) RDD a 中一個(gè) partition,同時(shí)又完全依賴(NarrowDependency) RDD b 中另一個(gè) partition。具體如下CartesianRDD 中的 partiton i 依賴于(RDD a).List(i / numPartitionsInRDDb)?和?(RDD b).List(i %numPartitionsInRDDb)。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);JavaPairRDD<Integer,Integer> cartesianRDD = javaRDD.cartesian(javaRDD); System.out.println(cartesianRDD.collect());distinct
官方文檔描述:
Return a new RDD containing the distinct elements in this RDD.函數(shù)原型:
def distinct(): JavaRDD[T]def distinct(numPartitions: Int): JavaRDD[T]第一個(gè)函數(shù)是基于第二函數(shù)實(shí)現(xiàn)的,只是numPartitions默認(rèn)為partitions.length,partitions為parent RDD的分區(qū)。
源碼分析:
def distinct(): RDD[T] = withScope { distinct(partitions.length)}def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) }distinct() 功能是 deduplicate RDD 中的所有的重復(fù)數(shù)據(jù)。由于重復(fù)數(shù)據(jù)可能分散在不同的 partition 里面,因此需要 shuffle 來進(jìn)行 aggregate 后再去重。然而,shuffle 要求數(shù)據(jù)類型是
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);JavaRDD<Integer> distinctRDD1 = javaRDD.distinct(); System.out.println(distinctRDD1.collect()); JavaRDD<Integer> distinctRDD2 = javaRDD.distinct(2); System.out.println(distinctRDD2.collect());?
?
aggregate
官方文檔描述:
Aggregate the elements of each partition, and then the results for all the partitions,using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce.Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.函數(shù)原型:
def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U], combOp: JFunction2[U, U, U]): U源碼分析:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult }aggregate函數(shù)將每個(gè)分區(qū)里面的元素進(jìn)行聚合,然后用combine函數(shù)將每個(gè)分區(qū)的結(jié)果和初始值(zeroValue)進(jìn)行combine操作。這個(gè)函數(shù)最終返回U的類型不需要和RDD的T中元素類型一致。 這樣,我們需要一個(gè)函數(shù)將T中元素合并到U中,另一個(gè)函數(shù)將兩個(gè)U進(jìn)行合并。其中,參數(shù)1是初值元素;參數(shù)2是seq函數(shù)是與初值進(jìn)行比較;參數(shù)3是comb函數(shù)是進(jìn)行合并 。?
注意:如果沒有指定分區(qū),aggregate是計(jì)算每個(gè)分區(qū)的,空值則用初始值替換。
實(shí)例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); Integer aggregateValue = javaRDD.aggregate(3, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("seq~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + v1 + "," + v2); return Math.max(v1, v2); } }, new Function2<Integer, Integer, Integer>() { int i = 0; @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("comb~~~~~~~~~i~~~~~~~~~~~~~~~~~~~"+i++); System.out.println("comb~~~~~~~~~v1~~~~~~~~~~~~~~~~~~~" + v1); System.out.println("comb~~~~~~~~~v2~~~~~~~~~~~~~~~~~~~" + v2); return v1 + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+aggregateValue);aggregateByKey
官方文檔描述:
Aggregate the values of each key, using given combine functions and a neutral "zero value".This function can return a different result type, U, than the type of the values in this RDD,V.Thus, we need one operation for merging a V into a U and one operation for merging two U's,as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.函數(shù)原型:
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner, seqFunc: JFunction2[U, V, U],combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] def aggregateByKey[U](zeroValue: U, numPartitions: Int, seqFunc: JFunction2[U, V, U],combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U], combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U]源碼分析:
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) // We will clean the combiner closure later in `combineByKey` val cleanedSeqOp = self.context.clean(seqOp) combineByKey[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner) }aggregateByKey函數(shù)對PairRDD中相同Key的值進(jìn)行聚合操作,在聚合過程中同樣使用了一個(gè)中立的初始值。和aggregate函數(shù)類似,aggregateByKey返回值的類型不需要和RDD中value的類型一致。因?yàn)閍ggregateByKey是對相同Key中的值進(jìn)行聚合操作,所以aggregateByKey函數(shù)最終返回的類型還是Pair RDD,對應(yīng)的結(jié)果是Key和聚合好的值;而aggregate函數(shù)直接是返回非RDD的結(jié)果,這點(diǎn)需要注意。在實(shí)現(xiàn)過程中,定義了三個(gè)aggregateByKey函數(shù)原型,但最終調(diào)用的aggregateByKey函數(shù)都一致。其中,參數(shù)zeroValue代表做比較的初始值;參數(shù)partitioner代表分區(qū)函數(shù);參數(shù)seq代表與初始值比較的函數(shù);參數(shù)comb是進(jìn)行合并的方法。
實(shí)例:
//將這個(gè)測試程序拿文字做一下描述就是:在data數(shù)據(jù)集中,按key將value進(jìn)行分組合并, //合并時(shí)在seq函數(shù)與指定的初始值進(jìn)行比較,保留大的值;然后在comb中來處理合并的方式。 List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); int numPartitions = 4; JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); final Random random = new Random(100); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+javaPairRDD.collect());JavaPairRDD<Integer, Integer> aggregateByKeyRDD = javaPairRDD.aggregateByKey(3,numPartitions, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("seq~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + v1 + "," + v2); return Math.max(v1, v2); } }, new Function2<Integer, Integer, Integer>() { int i = 0; @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("comb~~~~~~~~~i~~~~~~~~~~~~~~~~~~~" + i++); System.out.println("comb~~~~~~~~~v1~~~~~~~~~~~~~~~~~~~" + v1); System.out.println("comb~~~~~~~~~v2~~~~~~~~~~~~~~~~~~~" + v2); return v1 + v2; } }); System.out.println("aggregateByKeyRDD.partitions().size()~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+aggregateByKeyRDD.partitions().size()); System.out.println("aggregateByKeyRDD~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+aggregateByKeyRDD.collec?
?
cogroup
官方文檔描述:
For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the list of values for that key in `this` as well as `other`.函數(shù)原型:
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W])]def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])]def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], other3: JavaPairRDD[K, W3], partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])]def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])]def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])]def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], other3: JavaPairRDD[K, W3]): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])]def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JIterable[V], JIterable[W])]def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])]def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], other3: JavaPairRDD[K, W3], numPartitions: Int): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])]源碼分析:
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) cg.mapValues { case Array(vs, w1s) => (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]]) } }override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[_ <: Product2[K, _]] => if (rdd.partitioner == Some(part)) { logDebug("Adding one-to-one dependency with " + rdd) new OneToOneDependency(rdd) } else { logDebug("Adding shuffle dependency with " + rdd) new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer) } } } override def getPartitions: Array[Partition] = { val array = new Array[Partition](part.numPartitions) for (i <- 0 until array.length) { // Each CoGroupPartition will have a dependency per contributing RDD array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) => // Assume each RDD contributed a single dependency, and get it dependencies(j) match {case s: ShuffleDependency[_, _, _] => None case _ => Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))) } }.toArray) } array }cogroup() 的計(jì)算結(jié)果放在 CoGroupedRDD 中哪個(gè) partition 是由用戶設(shè)置的 partitioner 確定的(默認(rèn)是 HashPartitioner)。?
CoGroupedRDD 依賴的所有 RDD 放進(jìn)數(shù)組 rdds[RDD] 中。再次,foreach i,如果 CoGroupedRDD 和 rdds(i) 對應(yīng)的 RDD 是 OneToOneDependency 關(guān)系,那么?Dependecy[i] = new OneToOneDependency(rdd),否則 =?new ShuffleDependency(rdd)。最后,返回與每個(gè) parent RDD 的依賴關(guān)系數(shù)組 deps[Dependency]。?
Dependency 類中的 getParents(partition id) 負(fù)責(zé)給出某個(gè) partition 按照該 dependency 所依賴的 parent RDD 中的 partitions: List[Int]。?
getPartitions() 負(fù)責(zé)給出 RDD 中有多少個(gè) partition,以及每個(gè) partition 如何序列化。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,1); } });//與 groupByKey() 不同,cogroup() 要 aggregate 兩個(gè)或兩個(gè)以上的 RDD。 JavaPairRDD<Integer,Tuple2<Iterable<Integer>,Iterable<Integer>>> cogroupRDD = javaPairRDD.cogroup(javaPairRDD); System.out.println(cogroupRDD.collect());JavaPairRDD<Integer,Tuple2<Iterable<Integer>,Iterable<Integer>>> cogroupRDD3 = javaPairRDD.cogroup(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions();} }); System.out.println(cogroupRDD3);join
官方文檔描述:
Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and (k, v2) is in `other`. Performs a hash join across the cluster.函數(shù)原型:
def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)]def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)]def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)]源碼分析:
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) }從源碼中可以看出,join() 將兩個(gè) RDD[(K, V)] 按照 SQL 中的 join 方式聚合在一起。與 intersection() 類似,首先進(jìn)行 cogroup(), 得到
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); final Random random = new Random(); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });JavaPairRDD<Integer,Tuple2<Integer,Integer>> joinRDD = javaPairRDD.join(javaPairRDD); System.out.println(joinRDD.collect());JavaPairRDD<Integer,Tuple2<Integer,Integer>> joinRDD2 = javaPairRDD.join(javaPairRDD,2); System.out.println(joinRDD2.collect());JavaPairRDD<Integer,Tuple2<Integer,Integer>> joinRDD3 = javaPairRDD.join(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions();} }); System.out.println(joinRDD3.collect());?
?
fullOuterJoin
官方文檔描述:
Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each element (k, w) in `other`, the resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements in `this` have key k. Uses the given Partitioner to partition the output RDD.函數(shù)原型:
def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])]def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (Optional[V], Optional[W])]def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (Optional[V], Optional[W])]源碼分析:
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], Option[W]))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { case (vs, Seq()) => vs.iterator.map(v => (Some(v), None)) case (Seq(), ws) => ws.iterator.map(w => (None, Some(w))) case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w)) } }從源碼中可以看出,fullOuterJoin() 與 join() 類似,首先進(jìn)行 cogroup(), 得到?<K, (Iterable[V1], Iterable[V2])>?類型的 MappedValuesRDD,然后對 Iterable[V1] 和 Iterable[V2] 做笛卡爾集,注意在V1,V2中添加了None,并將集合 flat() 化。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); final Random random = new Random(); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });//全關(guān)聯(lián) JavaPairRDD<Integer,Tuple2<Optional<Integer>,Optional<Integer>>> fullJoinRDD = javaPairRDD.fullOuterJoin(javaPairRDD); System.out.println(fullJoinRDD);JavaPairRDD<Integer,Tuple2<Optional<Integer>,Optional<Integer>>> fullJoinRDD1 = javaPairRDD.fullOuterJoin(javaPairRDD,2); System.out.println(fullJoinRDD1);JavaPairRDD<Integer,Tuple2<Optional<Integer>,Optional<Integer>>> fullJoinRDD2 = javaPairRDD.fullOuterJoin(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions(); } }); System.out.println(fullJoinRDD2);leftOuterJoin
官方文檔描述:
Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to partition the output RDD.函數(shù)原型:
def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])]def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (V, Optional[W])]def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, Optional[W])]源碼分析:
def leftOuterJoin[W]( other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._2.isEmpty) { pair._1.iterator.map(v => (v, None)) } else { for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w)) } } }從源碼中可以看出,leftOuterJoin() 與 fullOuterJoin() 類似,首先進(jìn)行 cogroup(), 得到?<K, (Iterable[V1], Iterable[V2])>類型的 MappedValuesRDD,然后對 Iterable[V1] 和 Iterable[V2] 做笛卡爾集,注意在V1中添加了None,并將集合 flat() 化。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); final Random random = new Random(); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });//左關(guān)聯(lián) JavaPairRDD<Integer,Tuple2<Integer,Optional<Integer>>> leftJoinRDD = javaPairRDD.leftOuterJoin(javaPairRDD); System.out.println(leftJoinRDD);JavaPairRDD<Integer,Tuple2<Integer,Optional<Integer>>> leftJoinRDD1 = javaPairRDD.leftOuterJoin(javaPairRDD,2); System.out.println(leftJoinRDD1);JavaPairRDD<Integer,Tuple2<Integer,Optional<Integer>>> leftJoinRDD2 = javaPairRDD.leftOuterJoin(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions(); } }); System.out.println(leftJoinRDD2);rightOuterJoin
官方文檔描述:
Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to partition the output RDD.函數(shù)原型:
def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)]def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (Optional[V], W)]def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (Optional[V], W)]源碼分析:
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._1.isEmpty) { pair._2.iterator.map(w => (None, w)) } else { for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w) } } }從源碼中可以看出,rightOuterJoin() 與 fullOuterJoin() 類似,首先進(jìn)行 cogroup(), 得到?<K, (Iterable[V1], Iterable[V2])>?類型的 MappedValuesRDD,然后對 Iterable[V1] 和 Iterable[V2] 做笛卡爾集,注意在V2中添加了None,并將集合 flat() 化。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); final Random random = new Random(); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });//右關(guān)聯(lián) JavaPairRDD<Integer,Tuple2<Optional<Integer>,Integer>> rightJoinRDD = javaPairRDD.rightOuterJoin(javaPairRDD); System.out.println(rightJoinRDD);JavaPairRDD<Integer,Tuple2<Optional<Integer>,Integer>> rightJoinRDD1 = javaPairRDD.rightOuterJoin(javaPairRDD,2); System.out.println(rightJoinRDD1);JavaPairRDD<Integer,Tuple2<Optional<Integer>,Integer>> rightJoinRDD2 = javaPairRDD.rightOuterJoin(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions(); } }); System.out.println(rightJoinRDD2);?
?
sortByKey
官方文檔描述:
Sort the RDD by key, so that each partition contains a sorted range of the elements in ascending order. Calling `collect` or `save` on the resulting RDD will return or output an ordered list of records (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in order of the keys).函數(shù)原型:
def sortByKey(): JavaPairRDD[K, V]def sortByKey(ascending: Boolean): JavaPairRDD[K, V]def sortByKey(ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V]def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V]def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]源碼分析:
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope{ val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }sortByKey() 將 RDD[(K, V)] 中的 records 按 key 排序,ascending = true 表示升序,false 表示降序。目前 sortByKey() 的數(shù)據(jù)依賴很簡單,先使用 shuffle 將 records 聚集在一起(放到對應(yīng)的 partition 里面),然后將 partition 內(nèi)的所有 records 按 key 排序,最后得到的 MapPartitionsRDD 中的 records 就有序了。目前 sortByKey() 先使用 Array 來保存 partition 中所有的 records,再排序。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); final Random random = new Random(100); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });JavaPairRDD<Integer,Integer> sortByKeyRDD = javaPairRDD.sortByKey(); System.out.println(sortByKeyRDD.collect());repartitionAndSortWithinPartitions
官方文檔描述:
Repartition the RDD according to the given partitioner and, within each resulting partition,sort records by their keys.This is more efficient than calling `repartition` and then sorting within each partition because it can push the sorting down into the shuffle machinery.函數(shù)原型:
def repartitionAndSortWithinPartitions(partitioner: Partitioner): JavaPairRDD[K, V]def repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: Comparator[K]) : JavaPairRDD[K, V]源碼分析:
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope { new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering) }從源碼中可以看出,該方法依據(jù)partitioner對RDD進(jìn)行分區(qū),并且在每個(gè)結(jié)果分區(qū)中按key進(jìn)行排序;通過對比sortByKey發(fā)現(xiàn),這種方式比先分區(qū),然后在每個(gè)分區(qū)中進(jìn)行排序效率高,這是因?yàn)樗梢詫⑴判蛉谌氲絪huffle階段。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); final Random random = new Random();JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });JavaPairRDD<Integer,Integer> RepartitionAndSortWithPartitionsRDD = javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return key.toString().hashCode() % numPartitions(); } }); System.out.println(RepartitionAndSortWithPartitionsRDD.collect());?
?
combineByKey
官方文檔描述:
Generic function to combine the elements for each key using a custom set of aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C Note that V and C can be different -- for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:- `createCombiner`, which turns a V into a C (e.g., creates a one-element list)- `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)- `mergeCombiners`, to combine two C's into a single one. In addition, users can control the partitioning of the output RDD, and whether to perform map-side aggregation (if a mapper can produce multiple items with the same key).函數(shù)原型:
def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C]def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], numPartitions: Int): JavaPairRDD[K, C]def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner): JavaPairRDD[K, C]def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner, mapSideCombine: Boolean,serializer: Serializer): JavaPairRDD[K, C]該函數(shù)是用于將RDD[k,v]轉(zhuǎn)化為RDD[k,c],其中類型v和類型c可以相同也可以不同。?
其中的參數(shù)如下:?
- createCombiner:該函數(shù)用于將輸入?yún)?shù)RDD[k,v]的類型V轉(zhuǎn)化為輸出參數(shù)RDD[k,c]中類型C;?
- mergeValue:合并函數(shù),用于將輸入中的類型C的值和類型V的值進(jìn)行合并,得到類型C,輸入?yún)?shù)是(C,V),輸出是C;?
- mergeCombiners:合并函數(shù),用于將兩個(gè)類型C的值合并成一個(gè)類型C,輸入?yún)?shù)是(C,C),輸出是C;?
- numPartitions:默認(rèn)HashPartitioner中partition的個(gè)數(shù);?
- partitioner:分區(qū)函數(shù),默認(rèn)是HashPartitionner;?
- mapSideCombine:該函數(shù)用于判斷是否需要在map進(jìn)行combine操作,類似于MapReduce中的combine,默認(rèn)是 true。
源碼分析:
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }從源碼中可以看出,combineByKey()的實(shí)現(xiàn)是一邊進(jìn)行aggregate,一邊進(jìn)行compute() 的基礎(chǔ)操作。假設(shè)一組具有相同 K 的?<K, V>?records 正在一個(gè)個(gè)流向 combineByKey(),createCombiner 將第一個(gè) record 的 value 初始化為 c (比如,c = value),然后從第二個(gè) record 開始,來一個(gè) record 就使用 mergeValue(c, record.value) 來更新 c,比如想要對這些 records 的所有 values 做 sum,那么使用?c = c + record.value。等到 records 全部被 mergeValue(),得到結(jié)果 c。假設(shè)還有一組 records(key 與前面那組的 key 均相同)一個(gè)個(gè)到來,combineByKey() 使用前面的方法不斷計(jì)算得到 c’。現(xiàn)在如果要求這兩組 records 總的 combineByKey() 后的結(jié)果,那么可以使用?final c = mergeCombiners(c, c')?來計(jì)算;然后依據(jù)partitioner進(jìn)行不同分區(qū)合并。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); //轉(zhuǎn)化為pairRDD JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,1); } });JavaPairRDD<Integer,String> combineByKeyRDD = javaPairRDD.combineByKey(new Function<Integer, String>() { @Override public String call(Integer v1) throws Exception { return v1 + " :createCombiner: "; }}, new Function2<String, Integer, String>() { @Override public String call(String v1, Integer v2) throws Exception { return v1 + " :mergeValue: " + v2; } }, new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + " :mergeCombiners: " + v2; } }); System.out.println("result~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + combineByKeyRDD.collect());groupByKey
官方文檔描述:
Group the values for each key in the RDD into a single sequence. Allows controlling the partitioning of the resulting key-value pair RDD by passing a Partitioner. The ordering of elements within each group is not guaranteed,and may even differ each time the resulting RDD is evaluated.Note: This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] or [[PairRDDFunctions.reduceByKey]] will provide much better performance.Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].函數(shù)原型:
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]]def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]]源碼分析:
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKey[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }從源碼中可以看出groupByKey()是基于combineByKey()實(shí)現(xiàn)的, 只是將 Key 相同的 records 聚合在一起,一個(gè)簡單的 shuffle 過程就可以完成。ShuffledRDD 中的 compute() 只負(fù)責(zé)將屬于每個(gè) partition 的數(shù)據(jù) fetch 過來,之后使用 mapPartitions() 操作進(jìn)行 aggregate,生成 MapPartitionsRDD,到這里 groupByKey() 已經(jīng)結(jié)束。最后為了統(tǒng)一返回值接口,將 value 中的 ArrayBuffer[] 數(shù)據(jù)結(jié)構(gòu)抽象化成 Iterable[]。groupByKey() 沒有在 map 端進(jìn)行 combine(mapSideCombine = false),這樣設(shè)計(jì)是因?yàn)閙ap 端 combine 只會省掉 partition 里面重復(fù) key 占用的空間;但是,當(dāng)重復(fù) key 特別多時(shí),可以考慮開啟 combine。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); //轉(zhuǎn)為k,v格式 JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,1); } });JavaPairRDD<Integer,Iterable<Integer>> groupByKeyRDD = javaPairRDD.groupByKey(2); System.out.println(groupByKeyRDD.collect());//自定義partition JavaPairRDD<Integer,Iterable<Integer>> groupByKeyRDD3 = javaPairRDD.groupByKey(new Partitioner() { //partition各數(shù) @Override public int numPartitions() { return 10; } //partition方式 @Override public int getPartition(Object o) { return (o.toString()).hashCode()%numPartitions(); } }); System.out.println(groupByKeyRDD3.collect());?
?
reduceByKey
官方文檔描述:
Merge the values for each key using an associative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.函數(shù)原型:
def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V]def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V]該函數(shù)利用映射函數(shù)將每個(gè)K對應(yīng)的V進(jìn)行運(yùn)算。?
其中參數(shù)說明如下:?
- func:映射函數(shù),根據(jù)需求自定義;?
- partitioner:分區(qū)函數(shù);?
- numPartitions:分區(qū)數(shù),默認(rèn)的分區(qū)函數(shù)是HashPartitioner。
源碼分析:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKey[V]((v: V) => v, func, func, partitioner) }從源碼中可以看出,reduceByKey()是基于combineByKey()實(shí)現(xiàn)的,其中createCombiner只是簡單的轉(zhuǎn)化,而mergeValue和mergeCombiners相同,都是利用用戶自定義函數(shù)。reduceyByKey() 相當(dāng)于傳統(tǒng)的 MapReduce,整個(gè)數(shù)據(jù)流也與 Hadoop 中的數(shù)據(jù)流基本一樣。在combineByKey()中在 map 端開啟 combine(),因此,reduceyByKey() 默認(rèn)也在 map 端開啟 combine(),這樣在 shuffle 之前先通過 mapPartitions 操作進(jìn)行 combine,得到 MapPartitionsRDD, 然后 shuffle 得到 ShuffledRDD,再進(jìn)行 reduce(通過 aggregate + mapPartitions() 操作來實(shí)現(xiàn))得到 MapPartitionsRDD。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);//轉(zhuǎn)化為K,V格式 JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,1); } }); JavaPairRDD<Integer,Integer> reduceByKeyRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println(reduceByKeyRDD.collect());//指定numPartitions JavaPairRDD<Integer,Integer> reduceByKeyRDD2 = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } },2); System.out.println(reduceByKeyRDD2.collect());//自定義partition JavaPairRDD<Integer,Integer> reduceByKeyRDD4 = javaPairRDD.reduceByKey(new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object o) { return (o.toString()).hashCode()%numPartitions(); } }, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println(reduceByKeyRDD4.collect());foldByKey
官方文檔描述:
Merge the values for each key using an associative function and a neutral "zero value" which may be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).函數(shù)原型:
def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V]def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V]def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V]該函數(shù)用于將K對應(yīng)V利用函數(shù)映射進(jìn)行折疊、合并處理,其中參數(shù)zeroValue是對V進(jìn)行初始化。?
具體參數(shù)如下:?
- zeroValue:初始值;?
- numPartitions:分區(qū)數(shù),默認(rèn)的分區(qū)函數(shù)是HashPartitioner;?
- partitioner:分區(qū)函數(shù);?
- func:映射函數(shù),用戶自定義函數(shù)。
源碼分析:
def foldByKey( zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) // When deserializing, use a lazy val to create just one instance of the serializer per task lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) val cleanedFunc = self.context.clean(func) combineByKey[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner) }從foldByKey()實(shí)現(xiàn)可以看出,該函數(shù)是基于combineByKey()實(shí)現(xiàn)的,其中createCombiner只是利用zeroValue對V進(jìn)行初始化,而mergeValue和mergeCombiners相同,都是利用用戶自定義函數(shù)。在這里需要注意如果實(shí)現(xiàn)K的V聚合操作,初始設(shè)置需要特別注意,不要改變聚合的結(jié)果。
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); final Random rand = new Random(10); JavaPairRDD<Integer,String> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, String>() { @Override public Tuple2<Integer, String> call(Integer integer) throws Exception { return new Tuple2<Integer, String>(integer,Integer.toString(rand.nextInt(10))); } });JavaPairRDD<Integer,String> foldByKeyRDD = javaPairRDD.foldByKey("X", new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + ":" + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldByKeyRDD.collect());JavaPairRDD<Integer,String> foldByKeyRDD1 = javaPairRDD.foldByKey("X", 2, new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + ":" + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldByKeyRDD1.collect());JavaPairRDD<Integer,String> foldByKeyRDD2 = javaPairRDD.foldByKey("X", new Partitioner() { @Override public int numPartitions() { return 3; } @Override public int getPartition(Object key) { return key.toString().hashCode()%numPartitions(); } }, new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + ":" + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldByKeyRDD2.collect());?
?
zipPartitions
官方文檔描述:
Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by applying a function to the zipped partitions. Assumes that all the RDDs have the same number of partitions, but does not require them to have the same number of elements in each partition.函數(shù)原型:
def zipPartitions[U, V]( other: JavaRDDLike[U, _], f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V]該函數(shù)將兩個(gè)分區(qū)RDD按照partition進(jìn)行合并,形成一個(gè)新的RDD。
源碼分析:
def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope { new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning) }private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]( sc: SparkContext, var f: (Iterator[A], Iterator[B]) => Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], preservesPartitioning: Boolean = false) extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) } override def clearDependencies() { super.clearDependencies() rdd1 = null rdd2 = null f = null } }從源碼中可以看出,zipPartitions函數(shù)生成ZippedPartitionsRDD2,該RDD繼承ZippedPartitionsBaseRDD,在ZippedPartitionsBaseRDD中的getPartitions方法中判斷需要組合的RDD是否具有相同的分區(qū)數(shù),但是該RDD實(shí)現(xiàn)中并沒有要求每個(gè)partitioner內(nèi)的元素?cái)?shù)量相同。
實(shí)例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3, 2, 12, 5, 6, 1); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1,3); JavaRDD<String> zipPartitionsRDD = javaRDD.zipPartitions(javaRDD1, new FlatMapFunction2<Iterator<Integer>, Iterator<Integer>, String>() { @Override public Iterable<String> call(Iterator<Integer> integerIterator, Iterator<Integer> integerIterator2) throws Exception { LinkedList<String> linkedList = new LinkedList<String>(); while(integerIterator.hasNext() && integerIterator2.hasNext()) linkedList.add(integerIterator.next().toString() + "_" + integerIterator2.next().toString()); return linkedList; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipPartitionsRDD.collect());zip
官方文檔描述:
Zips this RDD with another one, returning key-value pairs with the first element in each RDD,second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).函數(shù)原型:
def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U]該函數(shù)用于將兩個(gè)RDD進(jìn)行組合,組合成一個(gè)key/value形式的RDD。
源碼分析:
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) => new Iterator[(T, U)] { def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { case (true, true) => true case (false, false) => false case _ => throw new SparkException("Can only zip RDDs with " + "same number of elements in each partition") } def next(): (T, U) = (thisIter.next(), otherIter.next()) } } }從源碼中可以看出,zip函數(shù)是基于zipPartitions實(shí)現(xiàn)的,其中preservesPartitioning為false,preservesPartitioning表示是否保留父RDD的partitioner分區(qū);另外,兩個(gè)RDD的partition數(shù)量及元數(shù)的數(shù)量都是相同的,否則會拋出異常。
實(shí)例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1); JavaPairRDD<Integer,Integer> zipRDD = javaRDD.zip(javaRDD1); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipRDD.collect());?
?
zipWithIndex
官方文檔描述:
Zips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index. This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.This method needs to trigger a spark job when this RDD contains more than one partitions.函數(shù)原型:
def zipWithIndex(): JavaPairRDD[T, JLong]該函數(shù)將RDD中的元素和這個(gè)元素在RDD中的indices組合起來,形成鍵/值對的RDD。
源碼分析:
def zipWithIndex(): RDD[(T, Long)] = withScope { new ZippedWithIndexRDD(this) }/** The start index of each partition. */ @transient private val startIndices: Array[Long] = { val n = prev.partitions.length if (n == 0) { Array[Long]() } else if (n == 1) { Array(0L) } else { prev.context.runJob( prev, Utils.getIteratorSize _, 0 until n - 1, // do not need to count the last partition allowLocal = false ).scanLeft(0L)(_ + _) } }override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = { val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition] firstParent[T].iterator(split.prev, context).zipWithIndex.map { x => (x._1, split.startIndex + x._2) } }從源碼中可以看出,該函數(shù)返回ZippedWithIndexRDD,在ZippedWithIndexRDD中通過計(jì)算startIndices獲得index;然后在compute函數(shù)中利用scala的zipWithIndex計(jì)算index。
實(shí)例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1); JavaPairRDD<Integer,Long> zipWithIndexRDD = javaRDD.zipWithIndex(); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipWithIndexRDD.collect());zipWithUniqueId
官方文檔描述:
Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].函數(shù)原型:
def zipWithUniqueId(): JavaPairRDD[T, JLong]該函數(shù)將RDD中的元素和一個(gè)對應(yīng)的唯一ID組合成鍵值對,其中ID的生成算法是每個(gè)分區(qū)的第一元素的ID是該分區(qū)索引號,每個(gè)分區(qū)中的第N個(gè)元素的ID是(N * 該RDD總的分區(qū)數(shù)) + (該分區(qū)索引號)。
源碼分析:
def zipWithUniqueId(): RDD[(T, Long)] = withScope { val n = this.partitions.length.toLong this.mapPartitionsWithIndex { case (k, iter) => iter.zipWithIndex.map { case (item, i) => (item, i * n + k) } } }*從源碼中可以看出,zipWithUniqueId()函數(shù)是利用mapPartitionsWithIndex()函數(shù)獲得每個(gè)元素的分區(qū)索引號,同時(shí)利用(i*n + k)進(jìn)行相應(yīng)的計(jì)算。
實(shí)例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1); JavaPairRDD<Integer,Long> zipWithIndexRDD = javaRDD.zipWithUniqueId(); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipWithIndexRDD.collect());總結(jié)
以上是生活随笔為你收集整理的Spark Java API:Transformation的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark Java API:Actio
- 下一篇: 基于用户行为的兴趣标签模型