问题出现这个异常已在SpakrSQL2.0.1中修复使用IDEA配置好SparkSQL2.0.0开始使用//构建会话valss:SparkSession=SparkSession.builder.master("local").appName("WordCount").getOrCreate()//读取文件valjsonData:DataFrame=ss.read.json("indata/data.json")运行会产生一个路径异常,这个路径SparkSQL内置的Hive创表保存数据默认路径的。java.net.URISyntaxException:RelativepathinabsoluteURI:file:E:/IdeaProject/SparkDemo/spark-warehouse问题解决//构建SparkSQL会话valss:SparkSession=SparkSession.builder.master("local").appName("WordCount").config("spark.sql.warehouse.dir","项目根路径").getOrCreate()问题解析查看源码发现,这个问题出在SQLConf.scala里面源码目录:org.apache.spark.sql.internal.SQLConf提取相关源码//默认配置模板valWAREHOUSE_PATH=SQLConfigBuilder("spark.sql.warehouse.dir").doc("Thedefaultlocationformanageddatabasesandtables.").stringConf.createWithDefault("file:${system:user.dir}/spark-warehouse")//替换模板里的路径,问题就出现这个替换的上defwarehousePath:String={getConf(WAREHOUSE_PATH).replace("${system:user.dir}",System.getProperty("user.dir"))}我们可以把这段逻辑简化一下使用这段代码System.getProperty("user.dir")替换路径,输出结果发现其中的含有\,这就是异常的原因valpath="file:${system:user.dir}/spark-warehouse".replace("${system:user.dir}",System.getProperty("user.dir"))println(path)--------------file:E:\IdeaProject\SparkDemo/spark-warehouse官方修复动态管方已在SparkSQL2.0.1版本中修复这个BUG修复代码如下valWAREHOUSE_PATH=SQLConfigBuilder("spark.sql.warehouse.dir").doc("Thedefaultlocationformanageddatabasesandtables.").stringConf.createWithDefault("${system:user.dir}/spark-warehouse")defwarehousePath:String={newPath(getConf(WAREHOUSE_PATH).replace("${system:user.dir}",System.getProperty("user.dir"))).toString}
什么是SparkSQLSparkSQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。SparkSQL的特点易整合统一的数据访问方式兼容Hive标准的数据连接什么是DataFrame与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待,DataFrame也是懒执行的。性能上比RDD要高,主要原因:优化的执行计划:查询计划通过Sparkcatalystoptimiser进行优化。什么是DataSet是DataframeAPI的一个扩展,是Spark最新的数据抽象。用户友好的API风格,既具有类型安全检查也具有Dataframe的查询优化特性。Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称。Dataframe是Dataset的特列,DataFrame=Dataset[Row],所以可以通过as方法将Dataframe转换为Dataset。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息我都用Row来表示。DataSet是强类型的。比如可以有Dataset[Car],Dataset[Person].DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。就跟JSON对象和类对象之间的类比。
Lineage(血统容错机制)Spark会将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。查看RDD的LineagetoDebugString每一步RDD都会记录着前面产生RDD的所有操作,以便产生错误的时候追溯回去重新计算。valconf=newSparkConf().setMaster("local[*]").setAppName("a")valsc=newSparkContext(conf)valintRDD=sc.makeRDD(List(1,2,3,4))valmapRDD=intRDD.map(_+2)valkvRDD=mapRDD.map(("a",_))valreRDD=kvRDD.reduceByKey(_+_)println(reRDD.toDebugString)-------------------(16)ShuffledRDD[3]atreduceByKeyat算子.scala:123[]+-(16)MapPartitionsRDD[2]atmapat算子.scala:122[]|MapPartitionsRDD[1]atmapat算子.scala:121[]|ParallelCollectionRDD[0]atmakeRDDat算子.scala:120[]查看依赖类型dependenciesvalintRDD=sc.makeRDD(List(1,2,3,4))valmapRDD=intRDD.map(_+2)valkvRDD=mapRDD.map(("a",_))valreRDD=kvRDD.reduceByKey(_+_)println(reRDD.dependencies)---------------------------List(org.apache.spark.ShuffleDependency@7923f5b3)依赖机制依赖值得是子RDD和父RDD之间的关系RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrowdependency)和宽依赖(widedependency)。窄依赖窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女`宽依赖宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle,总结:宽依赖我们形象的比喻为超生阶段划分阶段划分指Job在执行过程中,是否需要等待某一操作执行完成才能进行下一步(比如Shuffle),所以有宽依赖就需要划分一个阶段任务划分RDD任务切分中间分为:Application、Job、Stage和TaskApplication:初始化一个SparkContext即生成一个ApplicationJob:一个Action算子就会生成一个JobStage(阶段):根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。Task(任务):Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task。注意:Application->Job->Stage->Task每一层都是1对n的关系。
检查点机制通过血缘机制做容错的辅助,血缘过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。检查点通过将数据写入到文件系统(可以是本地或者HDFS或者是其他)实现了RDD的检查点功能。使用为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到设置的目录中,在checkpoint()的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。首先需要设置存储路径,一般为HDFS上SparkContext.setCheckpointDir(路径)valconf=newSparkConf().setMaster("local[*]").setAppName("c")valsc=newSparkContext(conf)//设置检查点存储的路径sc.setCheckpointDir("checkdir")设置需要做检查点的RDDvalstrRDD=sc.makeRDD(List("a","b","c","c"),2)valkvRDD=strRDD.map((_,1))kvRDD.checkpoint()valres=kvRDD.reduceByKey(_+_)res.checkpoint()valr=res.collect()
缓存机制的产生当Spakr在执行过程中,其中有某个机器宕机,造成数据丢失,在只有血缘机制的情况下,它会去从产生RDD再开始计算,因为没有数据的缓存。所以当血缘关系特别长的时候,就需要加入缓存机制.缓存机制RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。persist()&cache()persist()源码这个方法可以传入参数,定义缓存保存的级别/**PersistthisRDDwiththedefaultstoragelevel(`MEMORY_ONLY`).*/defpersist():this.type=persist(StorageLevel.MEMORY_ONLY)cache()源码可以发现cache()底层调用了persist()方法。/**PersistthisRDDwiththedefaultstoragelevel(`MEMORY_ONLY`).*/defcache():this.type=persist()缓存级别org/apache/spark/storage/StorageLevel.scala里面数字代表缓存份数,可缓存到内存、磁盘。objectStorageLevel{valNONE=newStorageLevel(false,false,false,false)valDISK_ONLY=newStorageLevel(true,false,false,false)valDISK_ONLY_2=newStorageLevel(true,false,false,false,2)valMEMORY_ONLY=newStorageLevel(false,true,false,true)valMEMORY_ONLY_2=newStorageLevel(false,true,false,true,2)valMEMORY_ONLY_SER=newStorageLevel(false,true,false,false)valMEMORY_ONLY_SER_2=newStorageLevel(false,true,false,false,2)valMEMORY_AND_DISK=newStorageLevel(true,true,false,true)valMEMORY_AND_DISK_2=newStorageLevel(true,true,false,true,2)valMEMORY_AND_DISK_SER=newStorageLevel(true,true,false,false)valMEMORY_AND_DISK_SER_2=newStorageLevel(true,true,false,false,2)valOFF_HEAP=newStorageLevel(true,true,true,false,1)缓存机制的体现不做缓存,多次打印,发现结果都不一样遇到行动算子才会计算valrdd=sc.makeRDD(Array("bigdataboy"))valnocache=rdd.map(_+System.currentTimeMillis)println(nocache.collect().mkString(","))println(nocache.collect().mkString(","))println(nocache.collect().mkString(","))----------------------------------------bigdataboy1582002800524bigdataboy1582002800634bigdataboy1582002800689加上缓存,发现是一样的valrdd=sc.makeRDD(Array("bigdataboy"))//加上这个RRD的缓存valcache=rdd.map(_+System.currentTimeMillis).cacheprintln(cache.collect().mkString(","))println(cache.collect().mkString(","))println(cache.collect().mkString(","))println(cache.toDebugString)----------------------------bigdataboy1582003008270bigdataboy1582003008270bigdataboy1582003008270(16)MapPartitionsRDD[1]atmapatRDD缓存.scala:18[MemoryDeserialized1xReplicated]//这就是血缘里的缓存|CachedPartitions:16;MemorySize:352.0B;ExternalBlockStoreSize:0.0B;DiskSize:0.0B|ParallelCollectionRDD[0]atmakeRDDatRDD缓存.scala:17[MemoryDeserialized1xReplicated]
累加器使用场景当分析一个日志文件,如果我们想计算文件中所有空行的数量,这时就可以使用累加器更方便的统计一个小例子统计数组空元素的个数valstrRDD=sc.makeRDD(List("Hadoop","Java","","Python",""))//定义结果varres=0strRDD.foreach{//是空就加1str=>(if(str.equals("")){res+=1})}println(s"空元素有$res个")}运行结果这是错误的结果空元素有0个出现0的原因分析RDD里面的数据是在不同的分区里面,一个分区会发给一个Executor执行,问题就出在这里,当每个Executor累加完成,每个Executor又怎么把结果组合起来,所以最主要的原因就是varres=0在Driver里面,而作为累加的逻辑在Executor里面,它们无法互通。自带的累加器Spark自带了一些累加器longAccumulator:long数值的累加doubleAccumulator:double数值得累加collectionAccumulator:空列表的,增加元素的方式累加valconf=newSparkConf().setMaster("local[*]").setAppName("ljq")valsc=newSparkContext(conf)//定义一个long类型的累加器varres=sc.longAccumulatorvalstrRDD=sc.makeRDD(List("Hadoop","Java","","Python",""))strRDD.foreach{//调用add()方法累加str=>(if(str.equals("")){res.add(1)})}println(s"空元素有${res.value}个")----------------------------空元素有2个自定义累加器首先分析一下longAccumulator()源码里面只是创建一个对象,注册累加器,然后返回累加器deflongAccumulator:LongAccumulator={//创建LongAccumulator对象valacc=newLongAccumulator//注册累加器,开始Spark的累加器register(acc)acc}继续查看创建的LongAccumulator对象源码会发现继承了AccumulatorV2接口,查看AccumulatorV2源码继承AccumulatorV2抽象类,自定义累加器需要重写这些方法abstractclassAccumulatorV2[IN,OUT]extendsSerializable{/***Returnsifthisaccumulatoriszerovalueornot.e.g.foracounteraccumulator,0iszero*value;foralistaccumulator,Niliszerovalue.*/defisZero:Boolean/***Createsanewcopyofthisaccumulator.*/defcopy():AccumulatorV2[IN,OUT]/***Resetsthisaccumulator,whichiszerovalue.i.e.call`isZero`must*returntrue.*/defreset():Unit/***Takestheinputsandaccumulates.e.g.itcanbeasimple`+=`forcounteraccumulator.*/defadd(v:IN):Unit/***Mergesanothersame-typeaccumulatorintothisoneandupdateitsstate,i.e.thisshouldbe*merge-in-place.*/defmerge(other:AccumulatorV2[IN,OUT]):Unit/***Definesthecurrentvalueofthisaccumulator*/defvalue:OUT}自定义累加器可以模仿着longAccumulator来写classMyAccumulatorV2extendsAccumulatorV2[String,util.ArrayList[String]]{//累加器存数据数组varwordArrays=newutil.ArrayList[String]()//判断累加器是否是初始化状态overridedefisZero:Boolean=wordArrays.size()==0//复制累加器对象overridedefcopy():AccumulatorV2[String,util.ArrayList[String]]={newMyAccumulatorV2()}//重置累加器overridedefreset():Unit={wordArrays.clear()}//向累加器放数据overridedefadd(v:String):Unit={if(v.contains("h")){wordArrays.add(v)}}//把其他分区的累加器的数据合并overridedefmerge(other:AccumulatorV2[String,util.ArrayList[String]]):Unit={wordArrays.addAll(other.value)}//返回累加器的结果overridedefvalue:util.ArrayList[String]=wordArrays}使用valconf=newSparkConf().setMaster("local[*]").setAppName("ljq")valsc=newSparkContext(conf)//创建累加器对象valres=newMyAccumulatorV2//向Spark注册累加器sc.register(res)valstrRDD=sc.makeRDD(List("hadoop","Java","hbase","Python","hive"))//因为累加条件累加器add()方法已经定义,所以直接添加strRDD.foreach(res.add)println(s"含h的单词是${res.value}")---------------------------------------含h的单词是[hbase,hadoop,Python,hive]
传递函数的场景当我自己封装好一个RDD的算子,需要使用,此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要把对象序列化的。封装一个算子这个是传递一个变量过滤掉RDD小于num的数值,并返回一个RDD。classMyFilter(num:Int){//过滤掉RDD小于num的数值deffilterInt(rdd:RDD[Int]):RDD[Int]={rdd.filter(_>num)}}使用这个算子valconf=newSparkConf().setMaster("local[*]").setAppName("p")valsc=newSparkContext(conf)valintRDD=sc.makeRDD(List(1,2,3,4,5,6,7,8,9))//实例化封装的方法,传入5valmyFilter=newMyFilter(5)//调用算子方法valresRDD:RDD[Int]=myFilter.filterInt(intRDD)resRDD.foreach(println)运行会发现报一个错对象没有实例化-objectnotserializable(class:cn.bigdata.MyFilter,value:cn.bigdata.MyFilter@5462f059)修改封装的方法报错的主要原因是Executor端拿不到需要变量、或者方法,所以需要序列化传送,就有了把传进来的变量赋值给方法内部的变量这样的操作,下面还有另外的操作。classMyFilter(num:Int){//过滤掉RDD小于num的数值deffilterInt(rdd:RDD[Int]):RDD[Int]={valn=numrdd.filter(_>n)}}封装算子2这是传递一个方法只是把上面的判断另外写成了一个函数classMyFilter(num:Int){//判断大小返回布尔值defbool(i:Int):Boolean={i>num}//过滤掉RDD小于num的数值deffilterInt(rdd:RDD[Int]):RDD[Int]={rdd.filter(bool(_))}}使用算子valconf=newSparkConf().setMaster("local[*]").setAppName("p")valsc=newSparkContext(conf)valintRDD=sc.makeRDD(List(1,2,3,4,5,6,7,8,9))//实例化封装的方法valmyFilter=newMyFilter(5)valresRDD:RDD[Int]=myFilter.filterInt(intRDD)resRDD.foreach(println)也会报同一个错对象没有序列化-objectnotserializable(class:cn.bigdata.MyFilter,value:cn.bigdata.MyFilter@27f643e)因为装饰者模式的存在,自己封装的操作只需要继承特质Serializable就好,上面的传递变量也可以这样。classMyFilter(num:Int)extendsSerializable{//判断大小返回布尔值defbool(i:Int):Boolean={i>num}//过滤掉RDD小于num的数值deffilterInt(rdd:RDD[Int]):RDD[Int]={rdd.filter(bool(_))}}
行动算子行动算子:是把RDD的对象转换成Scala的相关类型。转换算子:就是RDD之间相互转换的算子collect()作用:在驱动程序中,以数组的形式返回RDD的所有元素。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10)转换结果为数组valresRDD:Array[Int]=listRDD.collect()reduce(f:(T,T)=>T)作用:通过函数把数据两两进行聚合,先聚合分区内数据,再聚合分区间数据。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10)转换结果为数组如果是K-V类型,也是两两传入K-V对进行聚合。valres:Int=listRDD.reduce(_+_)println(res)---------------55first()作用:返回RDD中第一个数据take(num:Int)作用:返回该RDD的前n个元素组成的数组创建RDDvallistRDD:RDD[Int]=sc.makeRDD(List(5,4,7,6,1))取前三条valres:Array[Int]=listRDD.take(3)println(res.mkString(","))---------------5,4,7takeOrdered(num:Int)作用:返回该RDD排序后的前n个元素组成的数组创建RDDvallistRDD:RDD[Int]=sc.makeRDD(List(5,4,7,6,1))取前三条valres:Array[Int]=listRDD.takeOrdered(3)println(res.mkString(","))---------------1,4,5count()作用:返回RDD中元素的个数统计vallistRDD:RDD[Int]=sc.makeRDD(List(5,4,7,6,1))valres:Long=listRDD.count()println(res)------------5aggregate(zeroValue:U)(seqOp:(U,T)=>U,combOp:(U,U)=>U)作用:使分区内和分区间可以使用不同的函数聚合。zeroValue:两两聚合时第一次调用的值。seqOp:分区内聚合的函数。combOp:分区间聚合的函数。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10,2)聚合打印这个初始值在每个分区内聚合时会调用一次,还在分局间聚合时会调用一次。valres:Int=listRDD.aggregate(0)(_+_,_+_)println(res)------------55fold(zeroValue:T)(op:(T,T)=>T)作用:是aggregate()的精简版,分区内和分区间进行相同的聚合操作。其实底层还是先聚合分区内,再聚合分区间。zeroValue:两两聚合时第一次调用的值。op:分区内和分区间聚合的函数创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10,2)聚合打印这个初始值在每个分区内聚合时会调用一次,还在分局间聚合时会调用一次,跟aggregate()一样valres:Int=listRDD.fold(0)(_+_)println(res)---------------55文件保存把RDD数据通过文件的方式保存saveAsTextFile(路径):将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本saveAsObjectFile(路径):将数据集中的元素以Hadoopsequencefile的格式保存到指定的目录下,可以是HDFS或者其他Hadoop支持的文件系统。saveAsSequenceFile(路径):用于将RDD中的元素序列化成对象,存储到文件中。countByKey()作用:针对(K,V)类型的RDD,返回一个(K,Int)的Map,表示每一个K对应的元素个数。创建RDDvallistRDD:RDD[String]=sc.makeRDD(List("a","b","c","a"))valkvRDD:RDD[(String,Int)]=listRDD.map((_,1))统计打印valslMap:collection.Map[String,Long]=kvRDD.countByKey()println(slMap)--------------Map(a->2,b->1,c->1)foreach()作用:这个foreach()是对RDD操作,没有返回值。与Scala里的用法一样,只是使用的对象不一样。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)遍历打印listRDD.foreach(println)--------------52314
Key-Value类型算子都在org.apache.spark.rdd.PairRDDFunctions里面partitionBy(partitioner:Partitioner)作用:按照RDD的Key进行分区操作,如果原有的分区数和现有的分区数是一致的话就不进行分区,否则会生成ShuffleRDD,即会产生shuffle过程。参数partitioner:分区器,需要导入org.apache.spark.HashPartitioner需求:创建一个4个分区的RDD,对其重新分区延展:自定义分区器(重点)创建RDDvallistRDD:RDD[(String,String)]=sc.makeRDD(List(("aa","a"),("ab","b"),("bc","c"),("bd","d")),4)转换RDDHashPartitioner分区器最终导致数据在哪个分区,有它自己的算法,所以我们不能控制,就需要自定义分区器valpartRDD:RDD[(String,String)]=listRDD.partitionBy(newHashPartitioner(3))结果输出看不出来是把Key进行了什么算法分区的partRDD.saveAsTextFile("outpath")groupByKey()作用:把有相同Key的Value聚合在一起,最后生成生成一个K-V集合创建RDDvallistRDD:RDD[String]=sc.makeRDD(List("Python","Java","Scala","Python"))valkvRDD:RDD[(String,Int)]=listRDD.map((_,1))转换RDDvalresRDD:RDD[(String,Int)]=kvRDD.groupByKey()结果输出valr:Array[(String,Int)]=resRDD.collect()r.foreach(println)------------------(Java,CompactBuffer(1))(Scala,CompactBuffer(1))(Python,CompactBuffer(1,1))reduceByKey(func:(V,V)=>V,[numPartitions:Int])作用:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,第二个是聚合之后的分区数func:(V,V)=>V:有相同k的v,这个v之间的运算聚合创建RDDvallistRDD:RDD[String]=sc.makeRDD(List("Python","Java","Scala","Python"))valkvRDD:RDD[(String,Int)]=listRDD.map((_,1))转换RDDvalresRDD:RDD[(String,Int)]=kvRDD.reduceByKey(_+_)结果输出valr:Array[(String,Int)]=resRDD.collect()r.foreach(println)------------------(Java,1)(Scala,1)(Python,2)aggregateByKey(zeroValue:U)(seqOp:(U,V)=>U,combOp:(U,U)=>U)作用:能先把分区内(seqOp)有相同Key的数据进行聚合,然后再进行分区间(combOp)有相同Key的数据进行聚合。就是能定义分区内和分区间进行不同的聚合方式.zeroValue:是先两两做聚合,所以在取第一个Key做聚合时,需要一个zeroValue(初始值)来两两运算。seqOp:分区内做聚合的函数combOp:分区间做聚合的函数需求:创建一个pairRDD,取出每个分区相同key对应值的最大值,然后相加创建RDDvalkvRDD=sc.makeRDD(List(("a",1),("a",3),("b",5),("b",8),("c",5),("b",7)),2)转换RDDvalresRDD:RDD[(String,Int)]=kvRDD.aggregateByKey(0)(List(_,_).max,_+_)结果输出valr:Array[(String,Int)]=resRDD.collect()println(r.mkString(","))------------------(b,13),(a,3),(c,5)foldByKey(zeroValue:U)(func:(V,V)=>V))作用:是aggregateByKey()的精简版,使分区内和分区间进行相同的聚合操作。其实底层还是先聚合分区内,再聚合分区间。zeroValue:是先两两做聚合,所以在取第一个Key做聚合时,需要一个zeroValue(初始值)来两两运算。func:做聚合的函数需求:创建一个pairRDD,取出不同Key对应pairRDD最大值。创建RDDvalkvRDD=sc.makeRDD(List(("a",1),("a",3),("b",5),("b",8),("c",5),("b",7)),2)转换RDDvalresRDD:RDD[(String,Int)]=kvRDD.foldByKey(0)(List(_,_).max)结果输出valr:Array[(String,Int)]=resRDD.collect()println(r.mkString(","))------------------(b,8),(a,3),(c,5)combineByKey(参数是:三个函数)作用:对相同K,把V合并成一个集合。这个方法是aggregateByKey()的完全版。初始值的大小,格式需要使用函数进行处理。createCombiner:V=>C:对第一个V进行初始化处理,大小,格式等。mergeValue:(C,V):这是分区内的聚合,C的格式是初始化之后的格式,V是两两操作的第二个值mergeCombiners:(C,C)=>C:这是分区间的处理需求:创建一个pairRDD,把相同的Key的Value相加,并记录相同的Key有多少个。创建RDDvalkvRDD=sc.makeRDD(List(("a",11),("a",31),("b",51),("b",81),("c",51),("b",71)),2)转换RDDvalresRDD:RDD[(String,(Int,Int))]=kvRDD.combineByKey(//对第一个V做初始化格式处理,形成元组,为了方便记录次数(_,1),//分区内部两两处理,(a:(Int,Int),v)=>(a._1+v,a._2+1),//分区间,和加和,次数加次数(a:(Int,Int),b:(Int,Int))=>(a._1+b._1,a._2+b._2))结果输出valr:Array[(String,(Int,Int))]=resRDD.collect()println(r.mkString(","))-----------------------(b,(203,3)),(a,(42,2)),(c,(51,1))sortByKey([ascending:Boolean],[numPartitions:Int])作用:通过K-V的K来排序。[ascending:Boolean]:默认升序[numPartitions:Int]:分区数,或者叫任务数创建RDDvalkvRDD=sc.makeRDD(List(("a",1),("a",3),("b",5),("b",8),("c",5),("b",7)),2)转换RDD降序valresRDD:RDD[(String,Int)]=kvRDD.sortByKey(false)结果输出valr:Array[(String,Int)]=resRDD.collect()println(r.mkString(","))------------------(c,5),(b,5),(b,8),(b,7),(a,1),(a,3)mapValues(f:V=>U):RDD[(K,U))作用:传入函数,只对K-V里的V做处理需求:给每个K-V的V加上¥符号。创建RDDvalkvRDD=sc.makeRDD(List(("a",1),("a",3),("b",5),("b",8),("c",5),("b",7)),2)转换RDDvalresRDD:RDD[(String,String)]=kvRDD.mapValues(_+"¥")结果输出valr:Array[(String,String)]=resRDD.collect()println(r.mkString(","))------------------------(a,1¥),(a,3¥),(b,5¥),(b,8¥),(c,5¥),(b,7¥)join(other:RDD[(K,W)])作用:组合。在类型为(K,V)和(K,W)的两个RDD上调用。两个RDD,以相同的K为依据,不同的RDD相互组合V,返回(K,(V,W))形式的RDD。创建RDD创建2个RDDvalkvRDD=sc.makeRDD(List(("a",1),("b",5),("b",8),("c",5),("x",1)),2)valkvRDD1=sc.makeRDD(List(("a",1),("c",2),("b",5)))转换RDDvalresRDD:RDD[(String,(Int,Int))]=kvRDD.join(kvRDD1)结果输出注意输出没有(“x”,1)这一对,说明当只有单个时,不能输出valr:Array[(String,(Int,Int))]=resRDD.collect()println(r.mkString(","))------------------------(a,(1,1)),(b,(5,5)),(b,(8,5)),(c,(5,2))cogroup(other:RDD[(K,W)])作用:组合。这个组合更强大,把两个K-VRDD,通果相同的K把V组合起来,在同一RDD内V在一个迭代类型内,然后再把另外一个RDD组合的迭代类型一起组合成一个大K-V的V。创建RDD创建2个RDDvalkvRDD=sc.makeRDD(List(("a",1),("b",5),("b",8),("c",5),("x",1)))valkvRDD1=sc.makeRDD(List(("a",1),("c",2),("b",5),("b",5)))转换RDDvalresRDD:RDD[(String,(Iterable[Int],Iterable[Int]))]=kvRDD.cogroup(kvRDD1)结果输出注意输出没有(“x”,1)这一对,说明当只有单个时,不能输出valr:Array[(String,(Iterable[Int],Iterable[Int]))]=resRDD.collect()r.foreach(println)------------------------(a,(CompactBuffer(1),CompactBuffer(1)))(b,(CompactBuffer(5,8),CompactBuffer(5,5)))(c,(CompactBuffer(5),CompactBuffer(2)))(x,(CompactBuffer(1),CompactBuffer()))
groupByKey()groupByKey:按照key进行分组,直接进行shuffle。reduceByKey()reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].defreduceByKey(partitioner:Partitioner,func:(V,V)=>V):RDD[(K,V)]=self.withScope{combineByKeyWithClassTag[V]((v:V)=>v,func,func,partitioner)}使用建议reduceByKey和groupByKey有一定的共同之处,建议在不影响业务逻辑的情况下优先使用reduceByKey。如果shuffle之前有一个combine(预聚合)操作,这样的shuffle操作的性能并不低
双Value类型双Value类型是指两个RDD之间的操作并集union(other:RDD[T])作用:并集;对源RDD和参数RDD求并集后返回一个新的RDD创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(2to7)转换RDDvalresRDD:RDD[Int]=listRDD.union(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------1,2,3,4,5,2,3,4,5,6,7交集intersection(other:RDD[T],[numPartitions:Int])作用:交集,返回两个RDD中相同的值。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(3to8)转换RDDvalresRDD:RDD[Int]=listRDD.intersection(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------3,4,5差集subtract(other:RDD[T],[numPartitions:Int])作用:差集;返回源RDD中与参数RDD中不同的值。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(3to8)转换RDDvalresRDD:RDD[Int]=listRDD.subtract(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------1,2intersection(other:RDD[T])作用:笛卡尔积(尽量避免使用)创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(3to7)转换RDDvalresRDD:RDD[(Int,Int)]=listRDD.cartesian(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------(1,5),(1,6),(1,7),(2,5),(2,6),(2,7),(3,5),(3,6),(3,7)zip(other:RDD[T])作用:将两个RDD按照元素顺序组合成Key/Value形式的RDD,两个RDD的partition数量以及元素数量都相同,否则会抛出异常。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(3to7)转换RDDvalresRDD:RDD[(Int,Int)]=listRDD.zip(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------(1,5),(2,6),(3,7)
自定义分区器在使用partitionBy()函时我们需要传一个分区器。在有些场景下,我们需要指定某些数据放在一个分区,这时自带的分区器不能满足我们的需求。源码分析org.apache.spark.HashPartitioner分区器classHashPartitioner(partitions:Int)extendsPartitioner{require(partitions>=0,s"Numberofpartitions($partitions)cannotbenegative.")defnumPartitions:Int=partitionsdefgetPartition(key:Any):Int=keymatch{casenull=>0case_=>Utils.nonNegativeMod(key.hashCode,numPartitions)}overridedefequals(other:Any):Boolean=othermatch{caseh:HashPartitioner=>h.numPartitions==numPartitionscase_=>false}overridedefhashCode:Int=numPartitions}我们看见这个类继承了Partitioner(),继续查看源码abstractclassPartitionerextendsSerializable{//创建出来的分区数。defnumPartitions:Int//返回给定键的分区编号defgetPartition(key:Any):Int}发现这是一个抽象类,里面只有两个方法,我们就可以对比着HashPartitioner类来看numPartitions():Int:分区数,这个方法没有做处理,就是传进来的值getPartition(key:Any):Int:传入的参数是Key,所以就判断这个Key,返回一个Int来确定数据放在哪个分区。重写Partitioner类把下面这个RDD,把包含a的放在1分区,b放在2分区,其他的放在0分区vallistRDD:RDD[(String,String)]=sc.makeRDD(List(("aa","a"),("ab","b"),("bc","c"),("bd","d"),("dd","d")))自定义分区classMyPart(partitions:Int)extendsPartitioner{//包含a的1分区//包含b的2分区//其他的0分区//传入的参数至少大于三overridedefnumPartitions:Int=partitionsoverridedefgetPartition(key:Any):Int={//这里注意改变类型,然后才能使用响应的方法if(key.toString.indexOf("a")!=-1){1}elseif(key.toString.indexOf("b")!=-1){2}else{0}}使用自定义的分区器valpartRDD:RDD[(String,String)]=listRDD.partitionBy(newMyPart(3))查看分区结果partRDD.saveAsTextFile("outpath")