行动算子
创建 RDD
val listRDD: RDD[Int] = sc.makeRDD(1 to 10)
转换结果为数组
val resRDD: Array[Int] = listRDD.collect()
reduce(f: (T, T) => T)
- 作用:通过函数把数据
两两进行聚合
,先聚合分区内
数据,再聚合分区间
数据。
创建 RDD
val listRDD: RDD[Int] = sc.makeRDD(1 to 10)
转换结果为数组
如果是 K-V 类型,也是两两传入 K-V 对进行聚合。
val res: Int = listRDD.reduce(_ + _) println(res) --------------- 55
first()
- 作用:返回 RDD 中
第一个数据
take(num: Int)
- 作用:返回该
RDD
的前n个元素
组成的数组
创建 RDD
val listRDD: RDD[Int] = sc.makeRDD(List(5,4,7,6,1))
取前三条
val res: Array[Int] = listRDD.take(3) println(res.mkString(",")) --------------- 5,4,7
takeOrdered(num: Int)
- 作用:返回该
RDD排序后
的前n个元素
组成的数组
创建 RDD
val listRDD: RDD[Int] = sc.makeRDD(List(5,4,7,6,1))
取前三条
val res: Array[Int] = listRDD.takeOrdered(3) println(res.mkString(",")) --------------- 1,4,5
count()
- 作用:返回RDD中
元素的个数
统计
val listRDD: RDD[Int] = sc.makeRDD(List(5,4,7,6,1)) val res: Long= listRDD.count() println(res) ------------ 5
aggregate(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U)
- 作用:使
分区内
和分区间
可以使用不同的函数
聚合。- zeroValue:两两聚合时
第一次调用
的值。 - seqOp:分区内聚合的函数。
- combOp:分区间聚合的函数。
- zeroValue:两两聚合时
创建 RDD
val listRDD: RDD[Int] = sc.makeRDD(1 to 10,2)
聚合打印
这个初始值在每个分区内
聚合时会调用一次,还在分局间
聚合时会调用一次。
val res: Int = listRDD.aggregate(0)(_+_,_+_) println(res) ------------ 55
fold(zeroValue: T)(op: (T, T) => T)
- 作用:是
aggregate() 的精简版
,分区内
和分区间
进行相同的聚合操作。其实底层还是先聚合 分区内
,再聚合分区间
。- zeroValue:两两聚合时
第一次调用
的值。 - op:
分区内
和分区间
聚合的函数
- zeroValue:两两聚合时
创建 RDD
val listRDD: RDD[Int] = sc.makeRDD(1 to 10,2)
聚合打印
这个初始值在每个分区内
聚合时会调用一次,还在分局间
聚合时会调用一次,跟 aggregate()
一样
val res: Int = listRDD.fold(0)( _+ _) println(res) --------------- 55
文件保存
把 RDD 数据通过文件的方式保存
saveAsTextFile(路径):
将数据集的元素以textfile的形式
保存到HDFS文件系统
或者其他支持的文件系统
,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsObjectFile(路径):
将数据集中的元素以Hadoop sequencefile的格式
保存到指定的目录下,可以是 HDFS
或者其他 Hadoop支持
的文件系统。
saveAsSequenceFile(路径):
用于将RDD中的元素序列化成对象,存储到文件中。
countByKey()
- 作用:针对
(K,V)
类型的 RDD,返回一个(K,Int)的Map
,表示每一个 K 对应的元素个数。
创建 RDD
val listRDD: RDD[String] = sc.makeRDD(List("a", "b", "c", "a")) val kvRDD: RDD[(String, Int)] = listRDD.map((_, 1))
统计打印
val slMap: collection.Map[String, Long] = kvRDD.countByKey() println(slMap) -------------- Map(a -> 2, b -> 1, c -> 1)
foreach()
- 作用:这个
foreach()
是对 RDD 操作,没有返回值。与 Scala 里的用法一样,只是使用的对象不一样。
创建 RDD
val listRDD: RDD[Int] = sc.makeRDD(1 to 5)
遍历打印
listRDD.foreach(println) -------------- 5 2 3 1 4
版权声明:《 Spark Action 行动算子 》为明妃原创文章,转载请注明出处!
最后编辑:2020-2-17 12:02:25