未分类

留存率说明在互联网行业当中,因为拉新或推广的活动把用户引过来,用户开始访问网站/应用,但是经过一段时间可能就会有一部分客户逐渐流失了。留存率定义为用户在某段时间内开始使用网站/应用(一般定义是注册),经过一段时间后,仍然继续使用的人被认作是留存用户。留存率体现了网站/应用的质量和保留用户的能力。留存率计算方法:留存率=登录用户数/新增用户数*100%一般统计周期为天,常见的周期维度有次日、7日、14日/15日、30日、60日、90日)比如:次日留存率:(第一天新增用户数,第2天还登录的用户数)/第一天总注册用户数7日留存率:(第一天新增用户数,第8天还登录的用户数)/第一天总注册用户数30日留存率:(第一天新增用户数,第31天还登录的用户数)/第一天总注册用户Spark计算难点留存率涉及到两组数据的计算(新增人数&某天留存人数)计算思路思路分解数据例子:日期-用户ID2020-01-01,17731012020-01-01,17731022020-01-02,17731012020-01-02,17731032020-01-03,17731012020-01-03,1773102自定义累加器官方的提供的累加器里没有mutable.Map()的累加器优点:能够通过Key方便的取到Value。就是通过日期,方便的取到用户ID。//自定义Map累加器classmapAccumulatorextendsAccumulatorV2[Array[(String,Iterable[String])],mutable.Map[String,Iterable[String]]]{//累加器存数据MapvardateToUser:mutable.Map[String,Iterable[String]]=mutable.Map[String,Iterable[String]]()//判断累加器是否是初始化状态overridedefisZero:Boolean=dateToUser.isEmpty//复制累加器对象overridedefcopy():AccumulatorV2[Array[(String,Iterable[String])],mutable.Map[String,Iterable[String]]]={newmapAccumulator()}//重置累加器overridedefreset():Unit={dateToUser.clear()}//向累加器放数据overridedefadd(a:Array[(String,Iterable[String])]):Unit={valdate:String=a(0)._1valusers:Iterable[String]=a(0)._2dateToUser(date)=users}//把其他分区的累加器的数据合并overridedefmerge(other:AccumulatorV2[Array[(String,Iterable[String])],mutable.Map[String,Iterable[String]]]):Unit={other.value.foreach(line=>dateToUser(line._1)=line._2)}//返回累加器的结果overridedefvalue:mutable.Map[String,Iterable[String]]=dateToUser}注册累加器//注册累加器valuser_id_lists=newmapAccumulator()sc.register(user_id_lists)读取数据&分组valcsv_rdd=sc.textFile("文件")//相同日期分组valgroup_rdd=csv_rdd.groupByKey()存入累加器//按照分区遍历提高性能group_rdd.foreachPartition{partition=>partition.foreach{//日期为Key用户ID为Value,形成Map()添加到累加器line=>user_id_lists.add(Array(line._1->line._2.toSet))}}广播变量取出累加器里的数据进行广播变量//广播变量为累加器的值valuser_id_map:Broadcast[mutable.Map[String,Iterable[String]]]=sc.broadcast(user_id_lists.value)计算留存率再次遍历前面分组的RDD,通过日期取出广播变量里的用户IDvalres_rdd=group_rdd.mapPartitions{partition=>partition.map{line=>//获取当前日期的获取用户列表,没有获取到返回空的迭代valat_user_id:Iterable[String]=user_id_map.value.getOrElse(line._1,Iterable())//获取之前日期这个函数在文章后面valyesterday=getIntervalDay(line._1,-1)//获取之前日期的用户列表,没有获取到返回空的迭代valbefore_user_id:Iterable[String]=user_id_map.value.getOrElse(yesterday,Iterable())//获取之前日期总人数valbefore_all_user:Double=before_user_id.size.toDouble//留存人数:两日期用户列表去重取交集valkeep_user:Double=before_user_id.toSet.intersect(at_user_id.toSet).size.toDouble//留存率留存人数/前日期总人数varkeep_rate:Double=keep_user/before_all_userif(keep_rate.isNaN){keep_rate=0}//返回日期-留存率(line._1,keep_rate)}}计算间隔x天的日期//获取指定间隔日期defgetIntervalDay(date:String,num:Int):String={/*date:yyyy-MM-dd格式日期字符串num:间隔天数,有正负数之分。+(前面)-(后面)-1就是昨天,1就是明天*/vardateFormat:SimpleDateFormat=newSimpleDateFormat("yyyy-MM-dd")varcal:Calendar=Calendar.getInstance()cal.setTime(dateFormat.parse(date))cal.add(Calendar.DATE,num)varday=dateFormat.format(cal.getTime())day}

未分类

概述spark-submit可以提交任务到spark集群执行,也可以提交到hadoop的yarn集群执行。最简单提交命令本地模式提交spark-submit\--class类名jar包参数不同搭建的提交方式主要模板spark-submit\--class主类\--master提交地址\--deploy-mode提交方式\其他参数\运行主程序程序参数自生调度器管理俗称的集群模式,一个master,多个worker,这时是Spark自身的资源调度器管理。spark-submit\--class主类\--masterspark://master:6066\--deploy-modecluster\其他参数\运行主程序程序参数提交到Yarn管理spark-submit\--class主类\--masterspark://master:6066\--deploy-modeyarn\其他参数\运行主程序程序参数参数详解--class:执行主类,Python不需要--master:提交的地址:spark://master:6066、yarn、local--deploy-mode:默认cliebt,cluster--name:程序名--driver-memory:driver内存,默认为1G--driver-cores:driverCPU数,默认1核。--executor-memory:每个executor的内存,默认是1G--executor-core:每个executor的核数。在yarn或者standalone下使用--total-executor-cores:所有executor总共的核数。仅仅在mesos或者standalone下使用--num-executors:启动的executor数量。默认为2。在yarn下使用Spark集群例子如果有些内存核心不指定,就不能很好的利用集群的算力spark-submit\--masterspark://node1:6066\--deploy-modecluster\--driver-memory14g\--driver-cores8\--executor-memory14g\--executor-cores4\--total-executor-cores24\--classtwo/root/jar/bigdataboy.jarobs://bigdata/bigdataboyYarn提交例子spark-submit\--masteryarn\--deploy-modecluster\--driver-cores5\--driver-memory10g\--executor-memory25g\--executor-cores4\--class类名Jar包参数全部参数--master提交地址spark://host:port,mesos://host:port,yarn,k8s://https://host:port,默认:local[*]--deploy-mode提交模式默认:client(单机),cluster(集群)--class程序主类Java&Scala程序--name程序名字application的名字--jarsJARS程序使用的Jar包路径,用逗号分割--packagesmaven的Jar包名称需要与--repositories一起使用--repositories需要与--package一使用(--packagesmysql:mysql-connector-java:5.1.27--repositorieshttp://maven.aliyun.com/nexus/content/groups/public/)--exclude-packages为了避免冲突而指定不包含的package--py-filesPython程序的路径支持者.zip,.egg,or.py压缩文件或者文件--filesFILESComma-separatedlistoffilestobeplacedintheworkingdirectoryofeachexecutor.FilepathsofthesefilesinexecutorscanbeaccessedviaSparkFiles.get(fileName).--confPROP=VALUE额外的配置--properties-fileFILE加载配置路径,默认conf/spark-defaults.conf.--driver-memoryMEMMemoryfordriver(e.g.1000M,2G)(Default:1024M).--driver-java-options传递给driver的额外Java选项。--driver-library-path传递给driver的额外库路径项。--driver-class-pathdriver的类路径,用--jars添加的jar包会自动包含在类路径里--executor-memoryMEMMemoryperexecutor(e.g.1000M,2G)(Default:1G).--help,-h显示此帮助消息并退出。--verbose,-v打印额外的调试输出。--version,打印当前Spark的版本仅支持Cluster部署模式:--driver-coresNUMdriver使用的核心数,仅在集群模式下使用(Default:1).仅支持standalone、Mesos的集群部署模式:--supervise如果给定,则在失败时重新启动驱动程序。--killSUBMISSION_ID如果给定,则杀死指定的驱动程序。--statusSUBMISSION_ID如果给定,请求指定的驱动程序的状态。仅支持standaloneandMesos部署模式:--total-executor-coresNUM所有executors的核心总数。仅支持standaloneandYARN部署模式:--executor-coresNUM每个executors的核心数。(Default:1inYARNmode,或者standalone模式worker的所有核数。)仅支持YARN部署模式:--queueQUEUE_NAMEYarn提交的对列名称(Default:"default").--num-executorsNUMexecutors启动的数量(Default:2).--archivesARCHIVESCommaseparatedlistofarchivestobeextractedintotheworkingdirectoryofeachexecutor.--principalPRINCIPALPrincipaltobeusedtologintoKDC,whilerunningonsecureHDFS.--keytabKEYTABThefullpathtothefilethatcontainsthekeytabfortheprincipalspecifiedabove.ThiskeytabwillbecopiedtothenoderunningtheApplicationMasterviatheSecureDistributedCache,forrenewingtheloginticketsandthedelegationtokensperiodically.

2020-3-26 1157 0
未分类

概述Streaming的状态更新分为两种无状态更新:每个时间间隔的数据相对独立,不会进行累计,就像之前的单词统计一样,不使用个方法就行。有状态更新:会把现在统计的结果与之前的结果进行合并累计,使用有状态更新必须设置检查点。updateStateByKeyS:ClassTag=>Option[S]):DStream[(K,S)]这个方法有很多种参数,注重这个updateFunc函数参数。参数updateFunc:(Seq[V],Option[S])=>Option[S]Seq[V]:一个集合,集合里是当前计算结果相同Key的Value。Option[S]:该对象能获取之前时间段的相同Key的Value。实时单词累计统计//Spark通用配置valconf=newSparkConf().setMaster("local[*]").setAppName("up")//Streaming配置valcontext=newStreamingContext(conf,Seconds(1))//需要使用状态更新必须设置检查点机制context.checkpoint("checkdir")//监控端口valdataDStream:ReceiverInputDStream[String]=context.socketTextStream("192.168.176.61",5656)//处理数据,形成K-V对valmapDstream:DStream[(String,Int)]=dataDStream.flatMap(_.split("")).map((_,1))//新老数据合并更新valstateDStream:DStream[(String,Int)]=mapDstream.updateStateByKey{case(seq,option)=>{//获取之前的Key,如果没有取到,则返回0,说明是新出现的Keyvalstate:Int=option.getOrElse(0)//新旧Key的合并valkey:Int=state+seq.sum//返回Option对象Option(key)}}//打印结果stateDStream.print()//启动,持久化运行context.start()context.awaitTermination()

未分类

添加依赖注意SparkCore的版本和Scala的版本这里有个坑,下面依赖是Maven仓库官方的,如果运行报错,就把<scope>provided</scope>这个删掉<!--https://mvnrepository.com/artifact/org.apache.spark/spark-streaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.1.1</version><scope>provided</scope></dependency>大概使用流程SparkStreaming并不是真真意义上的实时处理,它有一个时间周期的概念,隔多长时间处理一次数据。这个间隔越小,就越接近实时。创建StreamingContext对象这里的时间周期,有几个官方提供的简单的方式,Milliseconds(毫秒)、Seconds(秒)、Minutes(分钟)…//Spark运行配置valconf=newSparkConf().setMaster("local[*]").setAppName("streaming")//创建SparkStreaming上下文对象和采集周期valcontext=newStreamingContext(conf,Seconds(3))设置监控的主机和端口SparkStreaming还能监控路径里文件的变化,但是一般不使用,有时候还会不生效,因为这方面Flume比它更强。设置了监控主机和端口后,那么就能得到DStream数据集了,底层也是RDD,完全能使用RDD的方法,并且还有新的方法。vallineDStream:ReceiverInputDStream[String]=context.socketTextStream("192.168.176.61",8888)保持Streaming的运行//启动Streaming,开始接收数据和处理流程context.start()//等待线程终止,保持SparkStreaming的持续运行,等待处理结果context.awaitTermination()一个小例子单词实时统计这个例子是无状态化转换的,每个周期的数据是独立的,没有连续统计。整体代码//Spark运行配置valconf=newSparkConf().setMaster("local[*]").setAppName("streaming")//创建SparkStreaming上下文对象,跟采集周期valcontext=newStreamingContext(conf,Seconds(3))//设置监控的主机及端口,返回DStream数据集vallineDStream:ReceiverInputDStream[String]=context.socketTextStream("192.168.176.61",8888)//对DStream进行处理valres:DStream[(String,Int)]=lineDStream.flatMap(_.split("")).map((_,1)).reduceByKey(_+_)//打印结果res.print()//启动Streaming,开始接收数据和处理流程context.start()//等待线程终止,保持SparkStreaming的持续运行,等待处理结果context.awaitTermination()使用natcat向指定端口发送数据Windows:只需要下载、解压、配置环境变量,开启命令nc-l-p端口号Linux:安装命令yuminstallnc,开启命令nc-lk端口号

2020-2-24 627 0
未分类

问题出现这个异常已在SpakrSQL2.0.1中修复使用IDEA配置好SparkSQL2.0.0开始使用//构建会话valss:SparkSession=SparkSession.builder.master("local").appName("WordCount").getOrCreate()//读取文件valjsonData:DataFrame=ss.read.json("indata/data.json")运行会产生一个路径异常,这个路径SparkSQL内置的Hive创表保存数据默认路径的。java.net.URISyntaxException:RelativepathinabsoluteURI:file:E:/IdeaProject/SparkDemo/spark-warehouse问题解决//构建SparkSQL会话valss:SparkSession=SparkSession.builder.master("local").appName("WordCount").config("spark.sql.warehouse.dir","项目根路径").getOrCreate()问题解析查看源码发现,这个问题出在SQLConf.scala里面源码目录:org.apache.spark.sql.internal.SQLConf提取相关源码//默认配置模板valWAREHOUSE_PATH=SQLConfigBuilder("spark.sql.warehouse.dir").doc("Thedefaultlocationformanageddatabasesandtables.").stringConf.createWithDefault("file:${system:user.dir}/spark-warehouse")//替换模板里的路径,问题就出现这个替换的上defwarehousePath:String={getConf(WAREHOUSE_PATH).replace("${system:user.dir}",System.getProperty("user.dir"))}我们可以把这段逻辑简化一下使用这段代码System.getProperty("user.dir")替换路径,输出结果发现其中的含有\,这就是异常的原因valpath="file:${system:user.dir}/spark-warehouse".replace("${system:user.dir}",System.getProperty("user.dir"))println(path)--------------file:E:\IdeaProject\SparkDemo/spark-warehouse官方修复动态管方已在SparkSQL2.0.1版本中修复这个BUG修复代码如下valWAREHOUSE_PATH=SQLConfigBuilder("spark.sql.warehouse.dir").doc("Thedefaultlocationformanageddatabasesandtables.").stringConf.createWithDefault("${system:user.dir}/spark-warehouse")defwarehousePath:String={newPath(getConf(WAREHOUSE_PATH).replace("${system:user.dir}",System.getProperty("user.dir"))).toString}

未分类

Lineage(血统容错机制)Spark会将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。查看RDD的LineagetoDebugString每一步RDD都会记录着前面产生RDD的所有操作,以便产生错误的时候追溯回去重新计算。valconf=newSparkConf().setMaster("local[*]").setAppName("a")valsc=newSparkContext(conf)valintRDD=sc.makeRDD(List(1,2,3,4))valmapRDD=intRDD.map(_+2)valkvRDD=mapRDD.map(("a",_))valreRDD=kvRDD.reduceByKey(_+_)println(reRDD.toDebugString)-------------------(16)ShuffledRDD[3]atreduceByKeyat算子.scala:123[]+-(16)MapPartitionsRDD[2]atmapat算子.scala:122[]|MapPartitionsRDD[1]atmapat算子.scala:121[]|ParallelCollectionRDD[0]atmakeRDDat算子.scala:120[]查看依赖类型dependenciesvalintRDD=sc.makeRDD(List(1,2,3,4))valmapRDD=intRDD.map(_+2)valkvRDD=mapRDD.map(("a",_))valreRDD=kvRDD.reduceByKey(_+_)println(reRDD.dependencies)---------------------------List(org.apache.spark.ShuffleDependency@7923f5b3)依赖机制依赖值得是子RDD和父RDD之间的关系RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrowdependency)和宽依赖(widedependency)。窄依赖窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女`宽依赖宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle,总结:宽依赖我们形象的比喻为超生阶段划分阶段划分指Job在执行过程中,是否需要等待某一操作执行完成才能进行下一步(比如Shuffle),所以有宽依赖就需要划分一个阶段任务划分RDD任务切分中间分为:Application、Job、Stage和TaskApplication:初始化一个SparkContext即生成一个ApplicationJob:一个Action算子就会生成一个JobStage(阶段):根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。Task(任务):Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task。注意:Application->Job->Stage->Task每一层都是1对n的关系。

2020-2-18 1768 0
未分类

缓存机制的产生当Spakr在执行过程中,其中有某个机器宕机,造成数据丢失,在只有血缘机制的情况下,它会去从产生RDD再开始计算,因为没有数据的缓存。所以当血缘关系特别长的时候,就需要加入缓存机制.缓存机制RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。persist()&cache()persist()源码这个方法可以传入参数,定义缓存保存的级别/**PersistthisRDDwiththedefaultstoragelevel(`MEMORY_ONLY`).*/defpersist():this.type=persist(StorageLevel.MEMORY_ONLY)cache()源码可以发现cache()底层调用了persist()方法。/**PersistthisRDDwiththedefaultstoragelevel(`MEMORY_ONLY`).*/defcache():this.type=persist()缓存级别org/apache/spark/storage/StorageLevel.scala里面数字代表缓存份数,可缓存到内存、磁盘。objectStorageLevel{valNONE=newStorageLevel(false,false,false,false)valDISK_ONLY=newStorageLevel(true,false,false,false)valDISK_ONLY_2=newStorageLevel(true,false,false,false,2)valMEMORY_ONLY=newStorageLevel(false,true,false,true)valMEMORY_ONLY_2=newStorageLevel(false,true,false,true,2)valMEMORY_ONLY_SER=newStorageLevel(false,true,false,false)valMEMORY_ONLY_SER_2=newStorageLevel(false,true,false,false,2)valMEMORY_AND_DISK=newStorageLevel(true,true,false,true)valMEMORY_AND_DISK_2=newStorageLevel(true,true,false,true,2)valMEMORY_AND_DISK_SER=newStorageLevel(true,true,false,false)valMEMORY_AND_DISK_SER_2=newStorageLevel(true,true,false,false,2)valOFF_HEAP=newStorageLevel(true,true,true,false,1)缓存机制的体现不做缓存,多次打印,发现结果都不一样遇到行动算子才会计算valrdd=sc.makeRDD(Array("bigdataboy"))valnocache=rdd.map(_+System.currentTimeMillis)println(nocache.collect().mkString(","))println(nocache.collect().mkString(","))println(nocache.collect().mkString(","))----------------------------------------bigdataboy1582002800524bigdataboy1582002800634bigdataboy1582002800689加上缓存,发现是一样的valrdd=sc.makeRDD(Array("bigdataboy"))//加上这个RRD的缓存valcache=rdd.map(_+System.currentTimeMillis).cacheprintln(cache.collect().mkString(","))println(cache.collect().mkString(","))println(cache.collect().mkString(","))println(cache.toDebugString)----------------------------bigdataboy1582003008270bigdataboy1582003008270bigdataboy1582003008270(16)MapPartitionsRDD[1]atmapatRDD缓存.scala:18[MemoryDeserialized1xReplicated]//这就是血缘里的缓存|CachedPartitions:16;MemorySize:352.0B;ExternalBlockStoreSize:0.0B;DiskSize:0.0B|ParallelCollectionRDD[0]atmakeRDDatRDD缓存.scala:17[MemoryDeserialized1xReplicated]

未分类

累加器使用场景当分析一个日志文件,如果我们想计算文件中所有空行的数量,这时就可以使用累加器更方便的统计一个小例子统计数组空元素的个数valstrRDD=sc.makeRDD(List("Hadoop","Java","","Python",""))//定义结果varres=0strRDD.foreach{//是空就加1str=>(if(str.equals("")){res+=1})}println(s"空元素有$res个")}运行结果这是错误的结果空元素有0个出现0的原因分析RDD里面的数据是在不同的分区里面,一个分区会发给一个Executor执行,问题就出在这里,当每个Executor累加完成,每个Executor又怎么把结果组合起来,所以最主要的原因就是varres=0在Driver里面,而作为累加的逻辑在Executor里面,它们无法互通。自带的累加器Spark自带了一些累加器longAccumulator:long数值的累加doubleAccumulator:double数值得累加collectionAccumulator:空列表的,增加元素的方式累加valconf=newSparkConf().setMaster("local[*]").setAppName("ljq")valsc=newSparkContext(conf)//定义一个long类型的累加器varres=sc.longAccumulatorvalstrRDD=sc.makeRDD(List("Hadoop","Java","","Python",""))strRDD.foreach{//调用add()方法累加str=>(if(str.equals("")){res.add(1)})}println(s"空元素有${res.value}个")----------------------------空元素有2个自定义累加器首先分析一下longAccumulator()源码里面只是创建一个对象,注册累加器,然后返回累加器deflongAccumulator:LongAccumulator={//创建LongAccumulator对象valacc=newLongAccumulator//注册累加器,开始Spark的累加器register(acc)acc}继续查看创建的LongAccumulator对象源码会发现继承了AccumulatorV2接口,查看AccumulatorV2源码继承AccumulatorV2抽象类,自定义累加器需要重写这些方法abstractclassAccumulatorV2[IN,OUT]extendsSerializable{/***Returnsifthisaccumulatoriszerovalueornot.e.g.foracounteraccumulator,0iszero*value;foralistaccumulator,Niliszerovalue.*/defisZero:Boolean/***Createsanewcopyofthisaccumulator.*/defcopy():AccumulatorV2[IN,OUT]/***Resetsthisaccumulator,whichiszerovalue.i.e.call`isZero`must*returntrue.*/defreset():Unit/***Takestheinputsandaccumulates.e.g.itcanbeasimple`+=`forcounteraccumulator.*/defadd(v:IN):Unit/***Mergesanothersame-typeaccumulatorintothisoneandupdateitsstate,i.e.thisshouldbe*merge-in-place.*/defmerge(other:AccumulatorV2[IN,OUT]):Unit/***Definesthecurrentvalueofthisaccumulator*/defvalue:OUT}自定义累加器可以模仿着longAccumulator来写classMyAccumulatorV2extendsAccumulatorV2[String,util.ArrayList[String]]{//累加器存数据数组varwordArrays=newutil.ArrayList[String]()//判断累加器是否是初始化状态overridedefisZero:Boolean=wordArrays.size()==0//复制累加器对象overridedefcopy():AccumulatorV2[String,util.ArrayList[String]]={newMyAccumulatorV2()}//重置累加器overridedefreset():Unit={wordArrays.clear()}//向累加器放数据overridedefadd(v:String):Unit={if(v.contains("h")){wordArrays.add(v)}}//把其他分区的累加器的数据合并overridedefmerge(other:AccumulatorV2[String,util.ArrayList[String]]):Unit={wordArrays.addAll(other.value)}//返回累加器的结果overridedefvalue:util.ArrayList[String]=wordArrays}使用valconf=newSparkConf().setMaster("local[*]").setAppName("ljq")valsc=newSparkContext(conf)//创建累加器对象valres=newMyAccumulatorV2//向Spark注册累加器sc.register(res)valstrRDD=sc.makeRDD(List("hadoop","Java","hbase","Python","hive"))//因为累加条件累加器add()方法已经定义,所以直接添加strRDD.foreach(res.add)println(s"含h的单词是${res.value}")---------------------------------------含h的单词是[hbase,hadoop,Python,hive]

未分类

传递函数的场景当我自己封装好一个RDD的算子,需要使用,此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要把对象序列化的。封装一个算子这个是传递一个变量过滤掉RDD小于num的数值,并返回一个RDD。classMyFilter(num:Int){//过滤掉RDD小于num的数值deffilterInt(rdd:RDD[Int]):RDD[Int]={rdd.filter(_>num)}}使用这个算子valconf=newSparkConf().setMaster("local[*]").setAppName("p")valsc=newSparkContext(conf)valintRDD=sc.makeRDD(List(1,2,3,4,5,6,7,8,9))//实例化封装的方法,传入5valmyFilter=newMyFilter(5)//调用算子方法valresRDD:RDD[Int]=myFilter.filterInt(intRDD)resRDD.foreach(println)运行会发现报一个错对象没有实例化-objectnotserializable(class:cn.bigdata.MyFilter,value:cn.bigdata.MyFilter@5462f059)修改封装的方法报错的主要原因是Executor端拿不到需要变量、或者方法,所以需要序列化传送,就有了把传进来的变量赋值给方法内部的变量这样的操作,下面还有另外的操作。classMyFilter(num:Int){//过滤掉RDD小于num的数值deffilterInt(rdd:RDD[Int]):RDD[Int]={valn=numrdd.filter(_>n)}}封装算子2这是传递一个方法只是把上面的判断另外写成了一个函数classMyFilter(num:Int){//判断大小返回布尔值defbool(i:Int):Boolean={i>num}//过滤掉RDD小于num的数值deffilterInt(rdd:RDD[Int]):RDD[Int]={rdd.filter(bool(_))}}使用算子valconf=newSparkConf().setMaster("local[*]").setAppName("p")valsc=newSparkContext(conf)valintRDD=sc.makeRDD(List(1,2,3,4,5,6,7,8,9))//实例化封装的方法valmyFilter=newMyFilter(5)valresRDD:RDD[Int]=myFilter.filterInt(intRDD)resRDD.foreach(println)也会报同一个错对象没有序列化-objectnotserializable(class:cn.bigdata.MyFilter,value:cn.bigdata.MyFilter@27f643e)因为装饰者模式的存在,自己封装的操作只需要继承特质Serializable就好,上面的传递变量也可以这样。classMyFilter(num:Int)extendsSerializable{//判断大小返回布尔值defbool(i:Int):Boolean={i>num}//过滤掉RDD小于num的数值deffilterInt(rdd:RDD[Int]):RDD[Int]={rdd.filter(bool(_))}}