好友
阅读权限 10
听众
最后登录 1970-1-1
Key-Value 类型
1 partitionBy 案例
1. 作用:对 pairRDD进行分区操作,如果原有的 partionRDD和现有的 partionRDD是一致的话就不进行分区, 否则会生成 ShuffleRDD,即会产生 shuffle过程。
2. 需求:创建一个 4个分区的 RDD,对其重新分区
1)创建一个 RDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24
( 2)查看 RDD的分区数
[Scala] 纯文本查看 复制代码
scala> rdd.partitions.size res24: Int = 4
( 3)对 RDD重新分区
[Scala] 纯文本查看 复制代码
scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2)) rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at <console>:26
( 4)查看新 RDD的分区数
[Scala] 纯文本查看 复制代码
scala> rdd2.partitions.size res25: Int = 2
2 groupByKey 案例
1. 作用: groupByKey也是对每个 key进行操作,但只生成一个 sequence。
2. 需求:创建一个 pairRDD,将相同 key对应值聚合到一个 sequence中,并计算相同 key对应值的相加结果。
( 1)创建一个 pairRDD
[Scala] 纯文本查看 复制代码
scala> val words = Array("one", "two", "two", "three", "three", "three") words: Array[String] = Array(one, two, two, three, three, three)
[Scala] 纯文本查看 复制代码
scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26
( 2)将相同 key对应值聚合到一个 sequence中
[Scala] 纯文本查看 复制代码
scala> val group = wordPairsRDD.groupByKey() group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28
( 3)打印结果
[Scala] 纯文本查看 复制代码
scala> group.collect() res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))
( 4)计算相同 key对应值的相加结果
[Scala] 纯文本查看 复制代码
scala> group.map(t => (t._1, t._2.sum)) res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31
( 5)打印结果
[Scala] 纯文本查看 复制代码
scala> res2.collect() res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
3 reduceByKey(func, [numTasks]) 案例
1. 在一个 (K,V)的 RDD上调用,返回一个 (K,V)的 RDD,使用指定的 reduce函数,将相同 key的值聚合到一起, reduce任务的个数可以通过第二个可选的参数来设置。
2. 需求:创建一个 pairRDD,计算相同 key对应值的相加结果
( 1)创建一个 pairRDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24
( 2)计算相同 key对应值的相加结果
[Scala] 纯文本查看 复制代码
scala> val reduce = rdd.reduceByKey((x,y) => x+y) reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26
( 3)打印结果
[Scala] 纯文本查看 复制代码
scala> reduce.collect() res29: Array[(String, Int)] = Array((female,6), (male,7))
4 reduceByKey 和 groupByKey 的区别
1. reduceByKey :按照 key 进行聚合,在 shuffle 之前有 combine (预聚合)操作,返回结果 RDD[k,v].
2. groupByKey :按照 key 进行分组,直接进行 shuffle 。
3. 开发指导: reduceByKey 比 groupByKey ,建议使用。但是需要注意是否会影响业务逻辑。
5 aggregateByKey案例
参数: (zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)
1. 作用:在 kv对的 RDD中,,按 key将 value进行分组合并,合并时,将每个 value和初始值作为 seq函数的参数,进行计算,返回的结果作为一个新的 kv对,然后再将结果按照 key进行合并,最后将每个分组的 value传递给 combine函数进行计算(先将前两个 value进行计算,将返回结果和下一个 value传给 combine函数,以此类推),将 key与计算结果作为一个新的 kv对输出。
2. 参数描述:
( 1 ) zeroValue : 给每一个分区中的每一个 key 一个初始值;
( 2 ) seqOp : 函数用于在每一个分区中用初始值逐步迭代 value ;
( 3 ) combOp : 函数用于合并每个分区中的结果。
3. 需求:创建一个 pairRDD,取出每个分区相同 key对应值的最大值,然后相加
4. 需求分析
图 1-aggregate 案例分析
( 1)创建一个 pairRDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
( 2)取出每个分区相同 key对应值的最大值,然后相加
[Scala] 纯文本查看 复制代码
scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_) agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:26
( 3)打印结果
[Scala] 纯文本查看 复制代码
scala> agg.collect() res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12))
6 foldByKey 案例
参数: (zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
1. 作用: aggregateByKey的简化操作, seqop和 combop相同
2. 需求:创建一个 pairRDD,计算相同 key对应值的相加结果
( 1)创建一个 pairRDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[91] at parallelize at <console>:24
( 2)计算相同 key对应值的相加结果
[Scala] 纯文本查看 复制代码
scala> val agg = rdd.foldByKey(0)(_+_) agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[92] at foldByKey at <console>:26
( 3)打印结果
[Scala] 纯文本查看 复制代码
scala> agg.collect() res61: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))
7 combineByKey[C] 案例
参数: (createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
1. 作用:对相同 K,把 V合并成一个集合。
( 1 ) createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素 ,combineByKey() 会使用一个叫作 createCombiner() 的函数来创建那个键对应的累加器的初始值
( 2 ) mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并
( 3 ) mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。
3. 需求:创建一个 pairRDD,根据 key计算每种 key的均值。(先计算每个 key出现的次数以及可以对应值的总和,再相除得到结果)
4. 需求分析:
图 2- combineByKey 案例分析
( 1)创建一个 pairRDD
[Scala] 纯文本查看 复制代码
scala> val input = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2) input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52] at parallelize at <console>:26
( 2)将相同 key对应的值相加,同时记录该 key出现的次数,放入一个二元组
[Scala] 纯文本查看 复制代码
scala> val combine = input.combineByKey((_,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)) combine: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[5] at combineByKey at <console>:28
( 3)打印合并后的结果
[Scala] 纯文本查看 复制代码
scala> combine.collect res5: Array[(String, (Int, Int))] = Array((b,(286,3)), (a,(274,3)))
( 4)计算平均值
[Scala] 纯文本查看 复制代码
scala> val result = combine.map{case (key,value) => (key,value._1/value._2.toDouble)} result: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[54] at map at <console>:30
( 5)打印结果
[Scala] 纯文本查看 复制代码
scala> result.collect() res33: Array[(String, Double)] = Array((b,95.33333333333333), (a,91.33333333333333))
8 sortByKey([ascending], [numTasks]) 案例
1. 作用:在一个 (K,V)的 RDD上调用, K必须实现 Ordered接口,返回一个按照 key进行排序的 (K,V)的 RDD
2. 需求:创建一个 pairRDD,按照 key的正序和倒序进行排序
( 1)创建一个 pairRDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24
( 2)按照 key的正序
[Scala] 纯文本查看 复制代码
scala> rdd.sortByKey(true).collect() res9: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))
( 3)按照 key的倒序
[Scala] 纯文本查看 复制代码
scala> rdd.sortByKey(false).collect() res10: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))
9 mapValues 案例
1. 针对于 (K,V)形式的类型只对 V进行操作
2. 需求:创建一个 pairRDD,并将 value添加字符串 "|||"
( 1)创建一个 pairRDD
[Scala] 纯文本查看 复制代码
scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c"))) rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at <console>:24
( 2)对 value添加字符串 "|||"
[Scala] 纯文本查看 复制代码
scala> rdd3.mapValues(_+"|||").collect() res26: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))
10 join(otherDataset, [numTasks]) 案例
1. 作用:在类型为 (K,V)和 (K,W)的 RDD上调用,返回一个相同 key对应的所有元素对在一起的 (K,(V,W))的 RDD
2. 需求:创建两个 pairRDD,并将 key相同的数据聚合到一个 元组 。
( 1)创建第一个 pairRDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at <console>:24
( 2)创建第二个 pairRDD
[Scala] 纯文本查看 复制代码
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6))) rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
( 3) join操作并打印结果
[Scala] 纯文本查看 复制代码
scala> rdd.join(rdd1).collect() res13: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
11 cogroup(otherDataset, [numTasks]) 案例
1. 作用:在类型为 (K,V)和 (K,W)的 RDD上调用,返回一个 (K,(Iterable<V>,Iterable<W>))类型的 RDD
2. 需求:创建两个 pairRDD,并将 key相同的数据聚合到一个 迭代器 。
( 1)创建第一个 pairRDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[37] at parallelize at <console>:24
( 2)创建第二个 pairRDD
[Scala] 纯文本查看 复制代码
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6))) rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:24
( 3) cogroup两个 RDD并打印结果
[Scala] 纯文本查看 复制代码
scala> rdd.cogroup(rdd1).collect() res14: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))
partitionBy案例
reduceByKey
aggregateByKey
aggregateByKey
foldByKey
combineByKey
combineByKey
sortByKey
mapValues
join
cogroup
partitionBy案例