Spark RDD 算子 (Value类型)

前言

RDD整体上分为 Value类型Key-Value类型

Value类型

map(func)

  • 作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
  • 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD

创建RDD

val intRDD: RDD[Int] = sc.makeRDD(1 to 10)

转换RDD

方法转换重点是每一个数据

val resRDD: RDD[Int] = intRDD.map(_ * 2)

结果输出

val r = resRDD.collect().mkString(",")
println(r)
----------
2,4,6,8,10,12,14,16,18,20

mapPartitions(func)

  • 作用:类似于map,但独立地在RDD的每一个分区上运行,因此func的函数类型必须是 (Interator[T]) => Iterator[U]
    • 效率要优于 map()
    • 但分区数据太大容易造成内存溢出的错误(OOM)
  • 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD

创建RDD

val intRDD: RDD[Int] = sc.makeRDD(1 to 10)

转换RDD

方法转换重点是每一个分区,所以匿名函数``传入是可迭代类型,返回也要可迭代类型

val resRDD: RDD[Int] = intRDD.mapPartitions((iter)=> {
            // 这里的 map() 是 Scala 中的函数
            iter.map(i => i*2)
        })

结果输出

val r = resRDD.collect().mkString(",")
println(r)
----------
2,4,6,8,10,12,14,16,18,20

mapPartitionsWithIndex(func)

  • 作用:类似于mapPartitions,但func带有一个整数参数表示所在分区的索引值,因此func的函数类型必须是 (Int, Interator[T]) => Iterator[U]
  • 需求:创建一个RDD,使元素跟所在分区索引形成一个元组组成一个新的RDD

创建RDD

val intRDD: RDD[Int] = sc.makeRDD(1 to 10)

转换RDD

方法转换重点是每一个分区

val resRDD: RDD[(String, Int)] = intRDD.mapPartitionsWithIndex {
            // 有多个传入参数时,一般使用 case
            case (num, datas) => {
                datas.map(("分区" + num, _))
            }
        }

结果输出

val r = resRDD.collect().mkString(",")
println(r)
----------
(分区1,1),(分区3,2),(分区4,3),(分区6,4),(分区7,5),(分区9,6),(分区11,7),(分区12,8),(分区14,9),(分区15,10)

flatMap(func)

  • 作用:类似于Scala中的flatMap(),每一个输入元素可以被映射0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
  • 需求:创建一个Array数组中的 List元素 RDD,映射成Array数组RDD

创建RDD

val listRDD: RDD[List[Int]] = sc.makeRDD(Array(List(1,2),List(3,4)))

转换RDD

val resRDD: RDD[Int] = listRDD.flatMap(list => list)

结果输出

val r = resRDD.collect().mkString(",")
println(r)
----------
1,2,3,4

glom()

  • 作用:将每一个分区形成一个数组,形成新的RDD类型是RDD[Array[T]]

创建RDD

val listRDD: RDD[Int] = sc.makeRDD(1 to 10,3)

转换RDD

val arrayRDD: RDD[Array[Int]] = listRDD.glom()

结果输出

分区不能平均存放数据时,多余的数据会往后面的分区存放

val res = arrayRDD.collect()
res.foreach(nums => println(nums.mkString(",")))
------------------------------------------------
1,2,3
4,5,6
7,8,9,10

groupBy()

  • 作用:分组,按照传入函数返回值进行分组。将相同的key对应的值放入一个迭代器,返回一个元组:RDD[(Int, Iterable[Int])],前一个 Int 是传入函数返回值。
  • 需求:区分奇偶数

创建RDD

val listRDD: RDD[Int] = sc.makeRDD(1 to 10)

转换RDD

val resRDD: RDD[(Int, Iterable[Int])] = listRDD.groupBy(_ % 2)

结果输出

val r = resRDD.collect()
r.foreach(println)
------------------
(0,CompactBuffer(2, 4, 6, 8, 10))
(1,CompactBuffer(1, 3, 5, 7, 9))

filter(func)

  • 作用:过滤。返回一个新的RDD,该RDD由经过func函数计算返回值为true的输入元素组成。
  • 需求:过滤奇数,留下偶数

创建RDD

val listRDD: RDD[Int] = sc.makeRDD(1 to 10)

转换RDD

val resRDD: RDD[Int] = listRDD.filter(_ % 2 == 0)

结果输出

val r = resRDD.collect()
println(r.mkString(","))
------------------
2,4,6,8,10

sample(withReplacement,fraction,[seed])

  • 作用:数据抽样
    • withReplacement:布尔值
      • true: 有放回的抽样,采用PoissonSampler抽样器(Poisson分布),
      • false:无放回的抽样,采用BernoulliSampler的抽样器
    • fraction:
      • 当为 true 时,这就是数学中的期望
      • 当为 false 时,这就是概率,值的范围[0,1]
    • seed:随机数种子,不好把握,保持默认就好
  • 需求:创建一个RDD(1-10),从中选择不放回抽样

创建RDD

val listRDD: RDD[Int] = sc.makeRDD(1 to 10)

转换RDD

val resRDD: RDD[Int] = listRDD.sample(false,0.4)

结果输出

其实每次抽样结果的 值的个数 的个数都不一样,但会保持在一个数学算法的范围里

val r = resRDD.collect()
println(r.mkString(","))
------------------------
5,7,9,10

distinct([numPartitions])

  • 作用:去重。因为去重之后,数据会减少,各个数据原本所在的分区会被洗牌打乱,所以其中就有 shuffle 操作,参数是去重后所产生的分区数量,可选。
  • 需求:去重,指定两个分区。

创建RDD

val listRDD: RDD[Int] = sc.makeRDD(List(6,2,1,2,8,3,5,5,4,1,2,2))

转换RDD

val resRDD: RDD[Int] = listRDD.distinct(2)

结果输出

如果不指定分区,输出的数据会自动排序

val r = resRDD.collect()
println(r.mkString(","))
-----------------------
4,6,8,2,1,3,5

coalesce(numPartitions,[shuffle])

  • 作用:减少分区数。用于大数据集过滤后,提高小数据集的执行效率。参数如果大于原本的分区数,则分区数不变,默认 false 不进行 shuffle 操作。
  • 需求:减少分区数。

创建RDD

val listRDD: RDD[Int] = sc.makeRDD(1 to 10,6)

转换RDD

val newPartition: RDD[Int] = listRDD.coalesce(3)

结果输出

如果不指定分区,输出的数据会自动排序

println("原本的分区数:"+listRDD.partitions.size)
println("新的分区数:"+newPartition.partitions.size)
---------------------------------------------------
原本的分区数:6
新的分区数:3

repartition(numPartitions: Int)

  • 作用:根据分区数,重新通过网络随机洗牌所有数据,分区数可以比原来多也可以少,一定会进行 shuffle 操作。

通过源码发现,其实在底层也调用了 coalesce() 方法,并且 shuffle = true

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

sortBy(func,[ascending],[numPartitions])

  • 作用:使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为升序。
    • func :处理函数
    • ascending:默认 true 升序,false 降序
    • numPartitions:处理后的分区数
  • 需求创建一个RDD,按照不同的规则,进行讲叙排序,并指定 3 个分区

创建RDD

val listRDD: RDD[Int] = sc.makeRDD(List(5,8,2,4,6,9,1),8)

转换RDD

val resRDD: RDD[Int] = listRDD.sortBy(x => x,false,3)

结果输出

如果不指定分区,输出的数据会自动排序

println("排序前分区数:"+ listRDD.partitions.length)
println("排序后分区数:"+ resRDD.partitions.length)
val r = resRDD.collect()
println(r.mkString(","))
-----------------------
4,6,8,2,1,3,5
排序前分区数:8
排序后分区数:3
9,8,6,5,4,2,1
发表评论 / Comment

用心评论~