Spark RDD 中的变量 & 函数传递

传递函数的场景

当我自己封装好一个 RDD 的算子,需要使用,此时需要主要的是,初始化工作是在Driver 端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要把对象序列化的。

封装一个算子

这个是传递一个变量

过滤掉 RDD 小于 num 的数值,并返回一个 RDD。

class MyFilter(num: Int) {
    // 过滤掉 RDD 小于 num 的数值
    def filterInt(rdd: RDD[Int]): RDD[Int] ={
        rdd.filter(_ > num)
    }
}

使用这个算子

val conf = new SparkConf().setMaster("local[*]").setAppName("p")
val sc = new SparkContext(conf)
val intRDD = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
// 实例化封装的方法,传入 5 
val myFilter = new MyFilter(5)
// 调用算子方法
val resRDD: RDD[Int] = myFilter.filterInt(intRDD)
resRDD.foreach(println)

运行会发现报一个错

对象没有实例化

- object not serializable (class: cn.bigdata.MyFilter, value: cn.bigdata.MyFilter@5462f059)

修改封装的方法

报错的主要原因是Executor端 拿不到需要变量、或者方法,所以需要序列化传送,就有了把传进来的变量赋值给方法内部的变量这样的操作,下面还有另外的操作。

class MyFilter(num: Int)  {

    // 过滤掉 RDD 小于 num 的数值
    def filterInt(rdd: RDD[Int]): RDD[Int] ={
        val n = num
        rdd.filter(_ > n)
    }
}

封装算子2

这是传递一个方法

只是把上面的判断另外写成了一个函数

class MyFilter(num: Int)  {

    // 判断大小返回布尔值
    def bool(i: Int): Boolean = {
        i > num
    }

    // 过滤掉 RDD 小于 num 的数值
    def filterInt(rdd: RDD[Int]): RDD[Int] ={
        rdd.filter(bool(_))
    }
}

使用算子

val conf = new SparkConf().setMaster("local[*]").setAppName("p")
val sc = new SparkContext(conf)
val intRDD = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
// 实例化封装的方法
val myFilter = new MyFilter(5)
val resRDD: RDD[Int] = myFilter.filterInt(intRDD)
resRDD.foreach(println)

也会报同一个错

对象没有序列化

- object not serializable (class: cn.bigdata.MyFilter, value: cn.bigdata.MyFilter@27f643e)

因为 装饰者模式 的存在,自己封装的操作 只需要继承特质Serializable就好,上面的 传递变量也可以这样。

class MyFilter(num: Int) extends Serializable {

    // 判断大小返回布尔值
    def bool(i: Int): Boolean = {
        i > num
    }

    // 过滤掉 RDD 小于 num 的数值
    def filterInt(rdd: RDD[Int]): RDD[Int] ={
        rdd.filter(bool(_))
    }
}
发表评论 / Comment

用心评论~