Spark RDD 算子 Key-Value类型

Key-Value 类型算子

都在 org.apache.spark.rdd.PairRDDFunctions 里面

partitionBy(partitioner: Partitioner)

  • 作用:按照RDD的 Key 进行分区操作,如果原有的分区数和现有的分区数是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。
    • 参数 partitioner:分区器,需要导入org.apache.spark.HashPartitioner
  • 需求:创建一个4个分区的RDD,对其重新分区
  • 延展:自定义分区器(重点)

创建RDD

val listRDD: RDD[(String, String)] = sc.makeRDD(List(("aa", "a"), ("ab", "b"), ("bc", "c"), ("bd", "d")),4)

转换RDD

HashPartitioner分区器最终导致数据在哪个分区,有它自己的算法,所以我们不能控制,就需要自定义分区器

val partRDD: RDD[(String, String)] = listRDD.partitionBy(new HashPartitioner(3))

结果输出

看不出来是把 Key 进行了什么算法分区的

partRDD.saveAsTextFile("outpath")

mark

groupByKey()

  • 作用:把有相同Key的 Value 聚合在一起 ,最后生成生成一个K-V集合

创建RDD

val listRDD: RDD[String] = sc.makeRDD(List("Python", "Java", "Scala", "Python"))
val kvRDD: RDD[(String, Int)] = listRDD.map((_, 1))

转换RDD

val resRDD: RDD[(String, Int)] = kvRDD.groupByKey()

结果输出

val r: Array[(String, Int)] = resRDD.collect()
r.foreach(println)
------------------
(Java,CompactBuffer(1))
(Scala,CompactBuffer(1))
(Python,CompactBuffer(1, 1))

reduceByKey(func: (V, V) => V, [numPartitions: Int])

  • 作用:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,第二个是聚合之后的分区数
    • func: (V, V) => V :有相同 k 的 v ,这个 v 之间的运算聚合

创建RDD

val listRDD: RDD[String] = sc.makeRDD(List("Python", "Java", "Scala", "Python"))
val kvRDD: RDD[(String, Int)] = listRDD.map((_, 1))

转换RDD

val resRDD: RDD[(String, Int)] = kvRDD.reduceByKey(_ + _)

结果输出

val r: Array[(String, Int)] = resRDD.collect()
 r.foreach(println)
------------------
(Java,1)
(Scala,1)
(Python,2)

aggregateByKey(zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U)

  • 作用:能先把分区内(seqOp)相同Key 的数据进行聚合,然后再进行分区间(combOp)相同Key的数据进行聚合。就是能定义分区内分区间进行不同的聚合方式.
    • zeroValue:是先 两两做聚合,所以在取第一个 Key 做聚合时,需要一个zeroValue (初始值)来两两运算。
    • seqOp:分区内做聚合的函数
    • combOp:分区间做聚合的函数

需求:创建一个pairRDD,取出每个分区相同key对应值的最大值,然后相加

创建RDD

val kvRDD = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 5), ("b", 8), ("c", 5), ("b", 7)), 2)

转换RDD

val resRDD: RDD[(String, Int)] = kvRDD.aggregateByKey(0)(List(_,_).max, _+_)

结果输出

val r: Array[(String, Int)] = resRDD.collect()
println(r.mkString(","))
------------------
(b,13),(a,3),(c,5)

foldByKey(zeroValue: U)(func: (V, V) => V))

  • 作用:是 aggregateByKey() 的精简版,使分区内分区间进行相同的聚合操作。其实底层还是 先聚合 分区内,再聚合分区间。
    • zeroValue:是先 两两做聚合,所以在取第一个 Key 做聚合时,需要一个zeroValue (初始值)来两两运算。
    • func:做聚合的函数

需求:创建一个pairRDD,取出不同Key对应 pairRDD最大值。

创建RDD

val kvRDD = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 5), ("b", 8), ("c", 5), ("b", 7)), 2)

转换RDD

val resRDD: RDD[(String, Int)] = kvRDD.foldByKey(0)(List(_,_).max)

结果输出

val r: Array[(String, Int)] = resRDD.collect()
println(r.mkString(","))
------------------
(b,8),(a,3),(c,5)

combineByKey(参数是:三个函数)

  • 作用:对相同K,把V合并成一个集合。这个方法是 aggregateByKey() 的完全版初始值大小格式需要使用函数进行处理。
    • createCombiner: V => C:对第一个 V 进行初始化处理,大小,格式等。
    • mergeValue: (C, V):这是分区内的聚合,C 的格式 是初始化之后的格式,V 是两两操作的第二个值
    • mergeCombiners: (C, C) => C:这是分区间的处理

需求:创建一个pairRDD,把相同的Key 的Value相加,并记录相同的Key有多少个。

创建RDD

val kvRDD = sc.makeRDD(List(("a", 11), ("a", 31), ("b", 51), ("b", 81), ("c", 51), ("b", 71)), 2)

转换RDD

val resRDD: RDD[(String, (Int, Int))] = kvRDD.combineByKey(
            // 对第一个 V 做初始化格式处理,形成元组,为了方便记录次数
            (_, 1),
            // 分区内部 两两处理,
            (a: (Int, Int), v) => (a._1 + v, a._2 + 1),
            // 分区间,和加和,次数加次数
            (a: (Int, Int), b: (Int, Int)) => (a._1 + b._1, a._2 + b._2)
        )

结果输出

val r: Array[(String, (Int, Int))] = resRDD.collect()
println(r.mkString(","))
-----------------------
(b,(203,3)),(a,(42,2)),(c,(51,1))

sortByKey([ascending: Boolean], [numPartitions: Int])

  • 作用:通过 K-V 的 K 来排序。
    • [ascending: Boolean]:默认升序
    • [numPartitions: Int]:分区数,或者叫任务数

创建RDD

val kvRDD = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 5), ("b", 8), ("c", 5), ("b", 7)), 2)

转换RDD

降序

val resRDD: RDD[(String, Int)] = kvRDD.sortByKey(false)

结果输出

val r: Array[(String, Int)] = resRDD.collect()
println(r.mkString(","))
------------------
(c,5),(b,5),(b,8),(b,7),(a,1),(a,3)

mapValues(f: V => U): RDD[(K, U))

  • 作用:传入函数,只对 K-V 里的 V 做处理
  • 需求:给每个 K-V 的 V 加上 ¥ 符号。

创建RDD

val kvRDD = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 5), ("b", 8), ("c", 5), ("b", 7)), 2)

转换RDD

val resRDD: RDD[(String, String)] = kvRDD.mapValues(_ + "¥")

结果输出

val r: Array[(String, String)] = resRDD.collect()
println(r.mkString(","))
------------------------
(a,1¥),(a,3¥),(b,5¥),(b,8¥),(c,5¥),(b,7¥)

join(other: RDD[(K, W)])

  • 作用:组合。在类型为(K,V)(K,W)两个RDD上调用。两个RDD,以相同的K为依据,不同的 RDD 相互组合 V,返回(K,(V,W))形式的RDD。

创建RDD

创建 2 个RDD

val kvRDD = sc.makeRDD(List(("a", 1), ("b", 5), ("b", 8), ("c", 5),("x",1)), 2)
val kvRDD1 = sc.makeRDD(List(("a", 1), ("c", 2), ("b", 5)))

转换RDD

val resRDD: RDD[(String, (Int, Int))] = kvRDD.join(kvRDD1)

结果输出

注意输出没有 (“x”,1) 这一对,说明当只有单个时,不能输出

val r: Array[(String, (Int, Int))] = resRDD.collect()
println(r.mkString(","))
------------------------
(a,(1,1)),(b,(5,5)),(b,(8,5)),(c,(5,2))

cogroup(other: RDD[(K, W)])

  • 作用:组合。这个组合更强大,把两个 K-V RDD,通果相同的 K把 V 组合起来,在同一RDD 内 V一个迭代类型内,然后再把另外一个RDD组合的迭代类型一起组合成一个大K-V 的 V

    创建RDD

创建 2 个RDD

val kvRDD = sc.makeRDD(List(("a", 1), ("b", 5), ("b", 8), ("c", 5),("x",1)))
val kvRDD1 = sc.makeRDD(List(("a", 1), ("c", 2), ("b", 5),("b", 5)))

转换RDD

val resRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = kvRDD.cogroup(kvRDD1)

结果输出

注意输出没有 (“x”,1) 这一对,说明当只有单个时,不能输出

val r: Array[(String, (Iterable[Int], Iterable[Int]))] = resRDD.collect()
r.foreach(println)
------------------------
(a,(CompactBuffer(1),CompactBuffer(1)))
(b,(CompactBuffer(5, 8),CompactBuffer(5, 5)))
(c,(CompactBuffer(5),CompactBuffer(2)))
(x,(CompactBuffer(1),CompactBuffer()))
发表评论 / Comment

用心评论~