Spark Action 行动算子

行动算子

  • 行动算子: 是把 RDD 的对象转换成 Scala 的相关类型
  • 转换算子: 就是RDD之间相互转换的算子

    collect()

  • 作用:在驱动程序中,以数组的形式返回RDD的所有元素。

创建 RDD

val listRDD: RDD[Int] = sc.makeRDD(1 to 10)

转换结果为数组

val resRDD: Array[Int] = listRDD.collect()

reduce(f: (T, T) => T)

  • 作用:通过函数把数据两两进行聚合,先聚合分区内数据,再聚合分区间数据。

创建 RDD

val listRDD: RDD[Int] = sc.makeRDD(1 to 10)

转换结果为数组

如果是 K-V 类型,也是两两传入 K-V 对进行聚合。

val res: Int = listRDD.reduce(_ + _)
println(res)
---------------
55

first()

  • 作用:返回 RDD 中第一个数据

take(num: Int)

  • 作用:返回该RDD前n个元素组成的数组

创建 RDD

val listRDD: RDD[Int] = sc.makeRDD(List(5,4,7,6,1))

取前三条

val res: Array[Int] = listRDD.take(3)
println(res.mkString(","))
---------------
5,4,7

takeOrdered(num: Int)

  • 作用:返回该RDD排序后前n个元素组成的数组

创建 RDD

val listRDD: RDD[Int] = sc.makeRDD(List(5,4,7,6,1))

取前三条

val res: Array[Int] = listRDD.takeOrdered(3)
println(res.mkString(","))
---------------
1,4,5

count()

  • 作用:返回RDD中元素的个数

统计

val listRDD: RDD[Int] = sc.makeRDD(List(5,4,7,6,1))
val res: Long= listRDD.count()
println(res)
------------
5

aggregate(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U)

  • 作用:使分区内分区间 可以使用不同的函数聚合。
    • zeroValue:两两聚合时第一次调用的值。
    • seqOp:分区内聚合的函数。
    • combOp:分区间聚合的函数。

创建 RDD

val listRDD: RDD[Int] = sc.makeRDD(1 to 10,2)

聚合打印

这个初始值在每个分区内聚合时会调用一次,还在分局间聚合时会调用一次。

val res: Int = listRDD.aggregate(0)(_+_,_+_)
println(res)
------------
55

fold(zeroValue: T)(op: (T, T) => T)

  • 作用:是 aggregate() 的精简版分区内分区间进行相同的聚合操作。其实底层还是 先聚合 分区内再聚合分区间
    • zeroValue:两两聚合时第一次调用的值。
    • op:分区内分区间 聚合的函数

创建 RDD

val listRDD: RDD[Int] = sc.makeRDD(1 to 10,2)

聚合打印

这个初始值在每个分区内聚合时会调用一次,还在分局间聚合时会调用一次,跟 aggregate() 一样

val res: Int = listRDD.fold(0)( _+ _)
println(res)
---------------
55

文件保存

把 RDD 数据通过文件的方式保存

saveAsTextFile(路径):

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

saveAsObjectFile(路径):

将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以是 HDFS 或者其他 Hadoop支持的文件系统。

saveAsSequenceFile(路径):

用于将RDD中的元素序列化成对象,存储到文件中。

countByKey()

  • 作用:针对(K,V)类型的 RDD,返回一个(K,Int)的Map,表示每一个 K 对应的元素个数。

创建 RDD

val listRDD: RDD[String] = sc.makeRDD(List("a", "b", "c", "a"))
val kvRDD: RDD[(String, Int)] = listRDD.map((_, 1))

统计打印

val slMap: collection.Map[String, Long] = kvRDD.countByKey()
println(slMap)
--------------
Map(a -> 2, b -> 1, c -> 1)

foreach()

  • 作用:这个 foreach()是对 RDD 操作,没有返回值。与 Scala 里的用法一样,只是使用的对象不一样。

创建 RDD

val listRDD: RDD[Int] = sc.makeRDD(1 to 5)

遍历打印

listRDD.foreach(println)
--------------
5
2
3
1
4
发表评论 / Comment

用心评论~