Spark RDD高级编程 之 累加器

累加器使用场景

当分析一个日志文件,如果我们想计算文件中所有空行的数量,这时就可以使用累加器更方便的统计

一个小例子

统计数组空元素的个数

val strRDD = sc.makeRDD(List("Hadoop", "Java", "", "Python", ""))

    // 定义结果
    var res  = 0
    strRDD.foreach{
        // 是空就加 1
        str => (if (str.equals("")){res+=1})
    }
    println(s"空元素有 $res 个")
}

运行结果

这是错误的结果

空元素有 0 个

出现 0 的原因分析

RDD里面的数据是在不同的分区里面,一个分区会发给一个 Executor 执行,问题就出在这里,当每个 Executor 累加完成,每个 Executor又怎么把结果 组合起来,所以最主要的原因就是 var res = 0 在 Driver 里面,而作为累加的逻辑Executor 里面,它们无法互通。

自带的累加器

Spark 自带了一些累加器

  • longAccumulator:long 数值的累加
  • doubleAccumulator:double 数值得累加
  • collectionAccumulator:空列表的,增加元素的方式累加
val conf = new SparkConf().setMaster("local[*]").setAppName("ljq")
val sc = new SparkContext(conf)

// 定义一个 long 类型的累加器
var res  = sc.longAccumulator

val strRDD = sc.makeRDD(List("Hadoop", "Java", "", "Python", ""))
strRDD.foreach{
    // 调用 add() 方法累加
    str => (if (str.equals("")){res.add(1)})
}
println(s"空元素有 ${res.value} 个")
----------------------------
空元素有 2 个

自定义累加器

首先分析一下 longAccumulator() 源码

里面只是 创建一个对象,注册累加器,然后返回累加器

def longAccumulator: LongAccumulator = {
    // 创建 LongAccumulator 对象
    val acc = new LongAccumulator
    // 注册累加器,开始 Spark 的累加器
    register(acc)
    acc
  }

继续查看创建的 LongAccumulator 对象源码会发现继承了 AccumulatorV2接口,查看AccumulatorV2源码

继承 AccumulatorV2 抽象类,自定义累加器需要重写这些方法

abstract class AccumulatorV2[IN, OUT] extends Serializable {
    /**
   * Returns if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero
   * value; for a list accumulator, Nil is zero value.
   */
    def isZero: Boolean

    /**
   * Creates a new copy of this accumulator.
   */
    def copy(): AccumulatorV2[IN, OUT]

    /**
   * Resets this accumulator, which is zero value. i.e. call `isZero` must
   * return true.
   */
    def reset(): Unit

    /**
   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator.
   */
    def add(v: IN): Unit

    /**
   * Merges another same-type accumulator into this one and update its state, i.e. this should be
   * merge-in-place.
   */
    def merge(other: AccumulatorV2[IN, OUT]): Unit

    /**
   * Defines the current value of this accumulator
   */
    def value: OUT
}

自定义累加器

可以模仿着longAccumulator 来写

class MyAccumulatorV2 extends AccumulatorV2[String, util.ArrayList[String]] {

    // 累加器存数据数组
    var wordArrays =  new util.ArrayList[String]()

    // 判断 累加器是否是初始化状态
    override def isZero: Boolean = wordArrays.size() == 0

    // 复制累加器对象
    override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {
        new MyAccumulatorV2()
    }

    // 重置累加器
    override def reset(): Unit = {
        wordArrays.clear()
    }

    // 向累加器放数据
    override def add(v: String): Unit = {
        if (v.contains("h")){
            wordArrays.add(v)
        }
    }

    // 把其他分区的累加器的数据合并
    override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {
        wordArrays.addAll(other.value)
    }

    // 返回累加器的结果
    override def value: util.ArrayList[String] = wordArrays
}

使用

val conf = new SparkConf().setMaster("local[*]").setAppName("ljq")
val sc = new SparkContext(conf)

// 创建累加器对象
val res = new MyAccumulatorV2
// 向 Spark 注册累加器
sc.register(res)

val strRDD = sc.makeRDD(List("hadoop", "Java", "hbase", "Python", "hive"))
// 因为累加条件 累加器 add() 方法已经定义,所以直接添加
strRDD.foreach(res.add)
println(s"含 h 的单词是 ${res.value} ")
---------------------------------------
含 h 的单词是 [hbase, hadoop, Python, hive]
发表评论 / Comment

用心评论~