前言
RDD整体上分为 Value类型
和 Key-Value类型
Value类型
map(func)
- 作用:返回一个新的RDD,该RDD由
每一个
输入元素经过func函数转换
后组成 - 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD
创建RDD
val intRDD: RDD[Int] = sc.makeRDD(1 to 10)
转换RDD
方法转换重点是每一个数据
val resRDD: RDD[Int] = intRDD.map(_ * 2)
结果输出
val r = resRDD.collect().mkString(",") println(r) ---------- 2,4,6,8,10,12,14,16,18,20
mapPartitions(func)
- 作用:类似于map,但独立地在RDD的每
一个分区
上运行,因此func的函数类型必须是(Interator[T]) => Iterator[U]
。- 效率要优于 map()
- 但分区数据太大容易造成内存溢出的错误(OOM)
- 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD
创建RDD
val intRDD: RDD[Int] = sc.makeRDD(1 to 10)
转换RDD
方法转换重点是每一个分区
,所以匿名函数``传入
是可迭代类型,返回
也要可迭代类型
val resRDD: RDD[Int] = intRDD.mapPartitions((iter)=> { // 这里的 map() 是 Scala 中的函数 iter.map(i => i*2) })
结果输出
val r = resRDD.collect().mkString(",") println(r) ---------- 2,4,6,8,10,12,14,16,18,20
mapPartitionsWithIndex(func)
- 作用:类似于mapPartitions,但func带有一个整数参数表示所在分区的索引值,因此func的函数类型必须是
(Int, Interator[T]) => Iterator[U]
; - 需求:创建一个RDD,使
元素
跟所在分区索引
形成一个元组组成一个新的RDD
创建RDD
val intRDD: RDD[Int] = sc.makeRDD(1 to 10)
转换RDD
方法转换重点是每一个分区
val resRDD: RDD[(String, Int)] = intRDD.mapPartitionsWithIndex { // 有多个传入参数时,一般使用 case case (num, datas) => { datas.map(("分区" + num, _)) } }
结果输出
val r = resRDD.collect().mkString(",") println(r) ---------- (分区1,1),(分区3,2),(分区4,3),(分区6,4),(分区7,5),(分区9,6),(分区11,7),(分区12,8),(分区14,9),(分区15,10)
flatMap(func)
- 作用:类似于Scala中的flatMap(),每一个输入元素可以被
映射
为0或多个
输出元素(所以func应该返回一个序列
,而不是单一元素) - 需求:创建一个Array数组中的 List元素 RDD,映射成Array数组RDD
创建RDD
val listRDD: RDD[List[Int]] = sc.makeRDD(Array(List(1,2),List(3,4)))
转换RDD
val resRDD: RDD[Int] = listRDD.flatMap(list => list)
结果输出
val r = resRDD.collect().mkString(",") println(r) ---------- 1,2,3,4
glom()
- 作用:将
每一个分区
形成一个数组
,形成新的RDD类型是RDD[Array[T]]
创建RDD
val listRDD: RDD[Int] = sc.makeRDD(1 to 10,3)
转换RDD
val arrayRDD: RDD[Array[Int]] = listRDD.glom()
结果输出
分区不能平均存放
数据时,多余的数据会往后面的分区
存放
val res = arrayRDD.collect() res.foreach(nums => println(nums.mkString(","))) ------------------------------------------------ 1,2,3 4,5,6 7,8,9,10
groupBy()
- 作用:分组,按照
传入函数
的返回值
进行分组。将相同的key对应的值
放入一个迭代器,返回一个元组:RDD[(Int, Iterable[Int])]
,前一个 Int 是传入函数返回值。 - 需求:区分奇偶数
创建RDD
val listRDD: RDD[Int] = sc.makeRDD(1 to 10)
转换RDD
val resRDD: RDD[(Int, Iterable[Int])] = listRDD.groupBy(_ % 2)
结果输出
val r = resRDD.collect() r.foreach(println) ------------------ (0,CompactBuffer(2, 4, 6, 8, 10)) (1,CompactBuffer(1, 3, 5, 7, 9))
filter(func)
- 作用:
过滤
。返回一个新的RDD,该RDD由经过func函数计算
后返回值为true的输入元素
组成。 - 需求:过滤奇数,留下偶数
创建RDD
val listRDD: RDD[Int] = sc.makeRDD(1 to 10)
转换RDD
val resRDD: RDD[Int] = listRDD.filter(_ % 2 == 0)
结果输出
val r = resRDD.collect() println(r.mkString(",")) ------------------ 2,4,6,8,10
sample(withReplacement,fraction,[seed])
- 作用:数据抽样
- withReplacement:布尔值
- true: 有放回的抽样,采用PoissonSampler抽样器(Poisson分布),
- false:无放回的抽样,采用BernoulliSampler的抽样器
- fraction:
- 当为 true 时,这就是数学中的期望
- 当为 false 时,这就是概率,值的范围[0,1]
- seed:随机数种子,不好把握,保持默认就好
- withReplacement:布尔值
- 需求:创建一个RDD(1-10),从中选择不放回抽样
创建RDD
val listRDD: RDD[Int] = sc.makeRDD(1 to 10)
转换RDD
val resRDD: RDD[Int] = listRDD.sample(false,0.4)
结果输出
其实每次抽样结果的 值
和 值的个数
的个数都不一样,但会保持在一个数学算法的范围里
val r = resRDD.collect() println(r.mkString(",")) ------------------------ 5,7,9,10
distinct([numPartitions])
- 作用:去重。因为去重之后,数据会减少,各个数据原本所在的
分区会被洗牌打乱
,所以其中就有shuffle 操作
,参数是去重后所产生的分区数量
,可选。 - 需求:去重,指定两个分区。
创建RDD
val listRDD: RDD[Int] = sc.makeRDD(List(6,2,1,2,8,3,5,5,4,1,2,2))
转换RDD
val resRDD: RDD[Int] = listRDD.distinct(2)
结果输出
如果不指定分区,输出的数据会自动排序
val r = resRDD.collect() println(r.mkString(",")) ----------------------- 4,6,8,2,1,3,5
coalesce(numPartitions,[shuffle])
- 作用:减少分区数。用于大数据集过滤后,提高小数据集的执行效率。参数如果
大于
原本的分区数,则分区数不变
,默认false
不进行 shuffle
操作。 - 需求:减少分区数。
创建RDD
val listRDD: RDD[Int] = sc.makeRDD(1 to 10,6)
转换RDD
val newPartition: RDD[Int] = listRDD.coalesce(3)
结果输出
如果不指定分区,输出的数据会自动排序
println("原本的分区数:"+listRDD.partitions.size) println("新的分区数:"+newPartition.partitions.size) --------------------------------------------------- 原本的分区数:6 新的分区数:3
repartition(numPartitions: Int)
- 作用:根据
分区数
,重新通过网络随机洗牌所有数据,分区数可以比原来多也可以少
,一定会进行 shuffle 操作。
通过源码发现,其实在底层也调用了 coalesce() 方法
,并且 shuffle = true
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
sortBy(func,[ascending],[numPartitions])
- 作用:使用func
先对数据进行处理
,按照处理后
的数据比较结果排序
,默认为升序。- func :处理函数
- ascending:默认 true 升序,false 降序
- numPartitions:处理后的分区数
- 需求创建一个RDD,按照不同的规则,进行讲叙排序,并指定 3 个分区
创建RDD
val listRDD: RDD[Int] = sc.makeRDD(List(5,8,2,4,6,9,1),8)
转换RDD
val resRDD: RDD[Int] = listRDD.sortBy(x => x,false,3)
结果输出
如果不指定分区,输出的数据会自动排序
println("排序前分区数:"+ listRDD.partitions.length) println("排序后分区数:"+ resRDD.partitions.length) val r = resRDD.collect() println(r.mkString(",")) ----------------------- 4,6,8,2,1,3,5 排序前分区数:8 排序后分区数:3 9,8,6,5,4,2,1
版权声明:《 Spark RDD 算子 (Value类型) 》为明非原创文章,转载请注明出处!
最后编辑:2020-2-14 15:02:45