Key-Value 类型算子
都在 org.apache.spark.rdd.PairRDDFunctions
里面
partitionBy(partitioner: Partitioner)
- 作用:按照RDD的
Key
进行分区操作,如果原有的
分区数和现有的
分区数是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。- 参数 partitioner:分区器,需要导入
org.apache.spark.HashPartitioner
- 参数 partitioner:分区器,需要导入
- 需求:创建一个4个分区的RDD,对其重新分区
- 延展:自定义分区器(重点)
创建RDD
val listRDD: RDD[(String, String)] = sc.makeRDD(List(("aa", "a"), ("ab", "b"), ("bc", "c"), ("bd", "d")),4)
转换RDD
HashPartitioner
分区器最终导致数据在哪个分区,有它自己的算法,所以我们不能控制,就需要自定义分区器
val partRDD: RDD[(String, String)] = listRDD.partitionBy(new HashPartitioner(3))
结果输出
看不出来是把 Key
进行了什么算法分区的
partRDD.saveAsTextFile("outpath")
groupByKey()
- 作用:把有
相同Key
的 Value 聚合在一起 ,最后生成生成一个K-V集合
创建RDD
val listRDD: RDD[String] = sc.makeRDD(List("Python", "Java", "Scala", "Python")) val kvRDD: RDD[(String, Int)] = listRDD.map((_, 1))
转换RDD
val resRDD: RDD[(String, Int)] = kvRDD.groupByKey()
结果输出
val r: Array[(String, Int)] = resRDD.collect() r.foreach(println) ------------------ (Java,CompactBuffer(1)) (Scala,CompactBuffer(1)) (Python,CompactBuffer(1, 1))
reduceByKey(func: (V, V) => V, [numPartitions: Int])
- 作用:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,第二个是聚合之后的分区数
func: (V, V) => V
:有相同 k 的 v ,这个 v 之间的运算聚合
创建RDD
val listRDD: RDD[String] = sc.makeRDD(List("Python", "Java", "Scala", "Python")) val kvRDD: RDD[(String, Int)] = listRDD.map((_, 1))
转换RDD
val resRDD: RDD[(String, Int)] = kvRDD.reduceByKey(_ + _)
结果输出
val r: Array[(String, Int)] = resRDD.collect() r.foreach(println) ------------------ (Java,1) (Scala,1) (Python,2)
aggregateByKey(zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U)
- 作用:能先把
分区内(seqOp)
有相同Key
的数据进行聚合,然后再进行分区间(combOp)
有相同Key
的数据进行聚合。就是能定义分区内
和分区间
进行不同的聚合方式
.- zeroValue:是先
两两做聚合
,所以在取第一个 Key
做聚合时,需要一个zeroValue (初始值)
来两两运算。 - seqOp:
分区内
做聚合的函数 - combOp:
分区间
做聚合的函数
- zeroValue:是先
需求:创建一个pairRDD,取出每个分区相同key对应值的最大值,然后相加
创建RDD
val kvRDD = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 5), ("b", 8), ("c", 5), ("b", 7)), 2)
转换RDD
val resRDD: RDD[(String, Int)] = kvRDD.aggregateByKey(0)(List(_,_).max, _+_)
结果输出
val r: Array[(String, Int)] = resRDD.collect() println(r.mkString(",")) ------------------ (b,13),(a,3),(c,5)
foldByKey(zeroValue: U)(func: (V, V) => V))
- 作用:是 aggregateByKey() 的精简版,使
分区内
和分区间
进行相同的聚合操作。其实底层还是 先聚合 分区内,再聚合分区间。- zeroValue:是先
两两做聚合
,所以在取第一个 Key
做聚合时,需要一个zeroValue (初始值)
来两两运算。 - func:做聚合的函数
- zeroValue:是先
需求:创建一个pairRDD,取出不同Key对应 pairRDD最大值。
创建RDD
val kvRDD = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 5), ("b", 8), ("c", 5), ("b", 7)), 2)
转换RDD
val resRDD: RDD[(String, Int)] = kvRDD.foldByKey(0)(List(_,_).max)
结果输出
val r: Array[(String, Int)] = resRDD.collect() println(r.mkString(",")) ------------------ (b,8),(a,3),(c,5)
combineByKey(参数是:三个函数)
- 作用:对相同K,把V合并成一个集合。这个方法是 aggregateByKey() 的
完全版
。初始值
的大小
,格式
需要使用函数
进行处理。- createCombiner: V => C:对
第一个 V
进行初始化处理
,大小,格式等。 - mergeValue: (C, V):这是
分区内
的聚合,C 的格式 是初始化之后的格式
,V 是两两操作的第二个值 - mergeCombiners: (C, C) => C:这是
分区间
的处理
- createCombiner: V => C:对
需求:创建一个pairRDD,把相同的Key
的Value相加,并记录相同的Key
有多少个。
创建RDD
val kvRDD = sc.makeRDD(List(("a", 11), ("a", 31), ("b", 51), ("b", 81), ("c", 51), ("b", 71)), 2)
转换RDD
val resRDD: RDD[(String, (Int, Int))] = kvRDD.combineByKey( // 对第一个 V 做初始化格式处理,形成元组,为了方便记录次数 (_, 1), // 分区内部 两两处理, (a: (Int, Int), v) => (a._1 + v, a._2 + 1), // 分区间,和加和,次数加次数 (a: (Int, Int), b: (Int, Int)) => (a._1 + b._1, a._2 + b._2) )
结果输出
val r: Array[(String, (Int, Int))] = resRDD.collect() println(r.mkString(",")) ----------------------- (b,(203,3)),(a,(42,2)),(c,(51,1))
sortByKey([ascending: Boolean], [numPartitions: Int])
- 作用:通过 K-V 的 K 来排序。
- [ascending: Boolean]:默认升序
- [numPartitions: Int]:分区数,或者叫任务数
创建RDD
val kvRDD = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 5), ("b", 8), ("c", 5), ("b", 7)), 2)
转换RDD
降序
val resRDD: RDD[(String, Int)] = kvRDD.sortByKey(false)
结果输出
val r: Array[(String, Int)] = resRDD.collect() println(r.mkString(",")) ------------------ (c,5),(b,5),(b,8),(b,7),(a,1),(a,3)
mapValues(f: V => U): RDD[(K, U))
- 作用:传入函数,只对 K-V 里的 V 做处理
- 需求:给每个 K-V 的 V 加上 ¥ 符号。
创建RDD
val kvRDD = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 5), ("b", 8), ("c", 5), ("b", 7)), 2)
转换RDD
val resRDD: RDD[(String, String)] = kvRDD.mapValues(_ + "¥")
结果输出
val r: Array[(String, String)] = resRDD.collect() println(r.mkString(",")) ------------------------ (a,1¥),(a,3¥),(b,5¥),(b,8¥),(c,5¥),(b,7¥)
join(other: RDD[(K, W)])
- 作用:组合。在类型为
(K,V)
和(K,W)
的两个RDD上
调用。两个RDD,以相同的K
为依据,不同的 RDD 相互组合 V
,返回(K,(V,W))形式的RDD。
创建RDD
创建 2 个RDD
val kvRDD = sc.makeRDD(List(("a", 1), ("b", 5), ("b", 8), ("c", 5),("x",1)), 2) val kvRDD1 = sc.makeRDD(List(("a", 1), ("c", 2), ("b", 5)))
转换RDD
val resRDD: RDD[(String, (Int, Int))] = kvRDD.join(kvRDD1)
结果输出
注意输出没有 (“x”,1) 这一对,说明当只有单个
时,不能输出
val r: Array[(String, (Int, Int))] = resRDD.collect() println(r.mkString(",")) ------------------------ (a,(1,1)),(b,(5,5)),(b,(8,5)),(c,(5,2))
cogroup(other: RDD[(K, W)])
- 作用:组合。这个组合更强大,把
两个 K-V RDD
,通果相同的 K
把 V 组合起来,在同一RDD 内 V
在一个迭代类型内
,然后再把另外一个RDD组合的迭代类型
一起组合成一个大K-V 的 V
。创建RDD
创建 2 个RDD
val kvRDD = sc.makeRDD(List(("a", 1), ("b", 5), ("b", 8), ("c", 5),("x",1))) val kvRDD1 = sc.makeRDD(List(("a", 1), ("c", 2), ("b", 5),("b", 5)))
转换RDD
val resRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = kvRDD.cogroup(kvRDD1)
结果输出
注意输出没有 (“x”,1) 这一对,说明当只有单个
时,不能输出
val r: Array[(String, (Iterable[Int], Iterable[Int]))] = resRDD.collect() r.foreach(println) ------------------------ (a,(CompactBuffer(1),CompactBuffer(1))) (b,(CompactBuffer(5, 8),CompactBuffer(5, 5))) (c,(CompactBuffer(5),CompactBuffer(2))) (x,(CompactBuffer(1),CompactBuffer()))
版权声明:《 Spark RDD 算子 Key-Value类型 》为明妃原创文章,转载请注明出处!
最后编辑:2020-2-17 05:02:13