累加器使用场景
当分析一个日志文件,如果我们想计算文件中所有空行的数量,这时就可以使用累加器更方便的统计
一个小例子
统计数组空元素的个数
val strRDD = sc.makeRDD(List("Hadoop", "Java", "", "Python", "")) // 定义结果 var res = 0 strRDD.foreach{ // 是空就加 1 str => (if (str.equals("")){res+=1}) } println(s"空元素有 $res 个") }
运行结果
这是错误的结果
空元素有 0 个
出现 0 的原因分析
RDD里面的数据是在不同的分区
里面,一个分区
会发给一个 Executor 执行
,问题就出在这里,当每个 Executor
累加完成,每个 Executor
又怎么把结果 组合起来
,所以最主要的原因就是 var res = 0 在 Driver 里面
,而作为累加的逻辑
在 Executor
里面,它们无法互通。
自带的累加器
Spark 自带了一些累加器
- longAccumulator:long 数值的累加
- doubleAccumulator:double 数值得累加
- collectionAccumulator:空列表的,增加元素的方式累加
val conf = new SparkConf().setMaster("local[*]").setAppName("ljq") val sc = new SparkContext(conf) // 定义一个 long 类型的累加器 var res = sc.longAccumulator val strRDD = sc.makeRDD(List("Hadoop", "Java", "", "Python", "")) strRDD.foreach{ // 调用 add() 方法累加 str => (if (str.equals("")){res.add(1)}) } println(s"空元素有 ${res.value} 个") ---------------------------- 空元素有 2 个
自定义累加器
首先分析一下 longAccumulator() 源码
里面只是 创建一个对象,注册累加器,然后返回累加器
def longAccumulator: LongAccumulator = { // 创建 LongAccumulator 对象 val acc = new LongAccumulator // 注册累加器,开始 Spark 的累加器 register(acc) acc }
继续查看创建的
LongAccumulator
对象源码会发现继承了AccumulatorV2
接口,查看AccumulatorV2
源码
继承 AccumulatorV2
抽象类,自定义累加器需要重写
这些方法
abstract class AccumulatorV2[IN, OUT] extends Serializable { /** * Returns if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero * value; for a list accumulator, Nil is zero value. */ def isZero: Boolean /** * Creates a new copy of this accumulator. */ def copy(): AccumulatorV2[IN, OUT] /** * Resets this accumulator, which is zero value. i.e. call `isZero` must * return true. */ def reset(): Unit /** * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. */ def add(v: IN): Unit /** * Merges another same-type accumulator into this one and update its state, i.e. this should be * merge-in-place. */ def merge(other: AccumulatorV2[IN, OUT]): Unit /** * Defines the current value of this accumulator */ def value: OUT }
自定义累加器
可以模仿着longAccumulator
来写
class MyAccumulatorV2 extends AccumulatorV2[String, util.ArrayList[String]] { // 累加器存数据数组 var wordArrays = new util.ArrayList[String]() // 判断 累加器是否是初始化状态 override def isZero: Boolean = wordArrays.size() == 0 // 复制累加器对象 override def copy(): AccumulatorV2[String, util.ArrayList[String]] = { new MyAccumulatorV2() } // 重置累加器 override def reset(): Unit = { wordArrays.clear() } // 向累加器放数据 override def add(v: String): Unit = { if (v.contains("h")){ wordArrays.add(v) } } // 把其他分区的累加器的数据合并 override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = { wordArrays.addAll(other.value) } // 返回累加器的结果 override def value: util.ArrayList[String] = wordArrays }
使用
val conf = new SparkConf().setMaster("local[*]").setAppName("ljq") val sc = new SparkContext(conf) // 创建累加器对象 val res = new MyAccumulatorV2 // 向 Spark 注册累加器 sc.register(res) val strRDD = sc.makeRDD(List("hadoop", "Java", "hbase", "Python", "hive")) // 因为累加条件 累加器 add() 方法已经定义,所以直接添加 strRDD.foreach(res.add) println(s"含 h 的单词是 ${res.value} ") --------------------------------------- 含 h 的单词是 [hbase, hadoop, Python, hive]
版权声明:《 Spark RDD高级编程 之 累加器 》为明妃原创文章,转载请注明出处!
最后编辑:2020-2-18 13:02:54