传递函数的场景
当我自己封装好一个 RDD 的算子,需要使用,此时需要主要的是,初始化工作是在Driver 端
进行的,而实际运行程序是在Executor端
进行的,这就涉及到了跨进程通信
,是需要把对象序列化的。
封装一个算子
这个是传递一个变量
过滤掉 RDD 小于 num 的数值
,并返回一个 RDD。
class MyFilter(num: Int) { // 过滤掉 RDD 小于 num 的数值 def filterInt(rdd: RDD[Int]): RDD[Int] ={ rdd.filter(_ > num) } }
使用这个算子
val conf = new SparkConf().setMaster("local[*]").setAppName("p") val sc = new SparkContext(conf) val intRDD = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9)) // 实例化封装的方法,传入 5 val myFilter = new MyFilter(5) // 调用算子方法 val resRDD: RDD[Int] = myFilter.filterInt(intRDD) resRDD.foreach(println)
运行会发现报一个错
对象没有实例化
- object not serializable (class: cn.bigdata.MyFilter, value: cn.bigdata.MyFilter@5462f059)
修改封装的方法
报错的主要原因是Executor端
拿不到需要变量、或者方法
,所以需要序列化传送
,就有了把传进来的变量
赋值给方法内部的变量
这样的操作,下面还有另外的操作。
class MyFilter(num: Int) { // 过滤掉 RDD 小于 num 的数值 def filterInt(rdd: RDD[Int]): RDD[Int] ={ val n = num rdd.filter(_ > n) } }
封装算子2
这是传递一个方法
只是把上面的判断
另外写成了一个函数
class MyFilter(num: Int) { // 判断大小返回布尔值 def bool(i: Int): Boolean = { i > num } // 过滤掉 RDD 小于 num 的数值 def filterInt(rdd: RDD[Int]): RDD[Int] ={ rdd.filter(bool(_)) } }
使用算子
val conf = new SparkConf().setMaster("local[*]").setAppName("p") val sc = new SparkContext(conf) val intRDD = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9)) // 实例化封装的方法 val myFilter = new MyFilter(5) val resRDD: RDD[Int] = myFilter.filterInt(intRDD) resRDD.foreach(println)
也会报同一个错
对象没有序列化
- object not serializable (class: cn.bigdata.MyFilter, value: cn.bigdata.MyFilter@27f643e)
因为 装饰者模式 的存在,自己封装的操作 只需要继承特质Serializable
就好,上面的 传递变量也可以这样。
class MyFilter(num: Int) extends Serializable { // 判断大小返回布尔值 def bool(i: Int): Boolean = { i > num } // 过滤掉 RDD 小于 num 的数值 def filterInt(rdd: RDD[Int]): RDD[Int] ={ rdd.filter(bool(_)) } }
版权声明:《 Spark RDD 中的变量 & 函数传递 》为明妃原创文章,转载请注明出处!
最后编辑:2020-2-17 14:02:27