哔哔大数据

传递函数的场景当我自己封装好一个RDD的算子,需要使用,此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要把对象序列化的。封装一个算子这个是传递一个变量过滤掉RDD小于num的数值,并返回一个RDD。classMyFilter(num:Int){//过滤掉RDD小于num的数值deffilterInt(rdd:RDD[Int]):RDD[Int]={rdd.filter(_>num)}}使用这个算子valconf=newSparkConf().setMaster("local[*]").setAppName("p")valsc=newSparkContext(conf)valintRDD=sc.makeRDD(List(1,2,3,4,5,6,7,8,9))//实例化封装的方法,传入5valmyFilter=newMyFilter(5)//调用算子方法valresRDD:RDD[Int]=myFilter.filterInt(intRDD)resRDD.foreach(println)运行会发现报一个错对象没有实例化-objectnotserializable(class:cn.bigdata.MyFilter,value:cn.bigdata.MyFilter@5462f059)修改封装的方法报错的主要原因是Executor端拿不到需要变量、或者方法,所以需要序列化传送,就有了把传进来的变量赋值给方法内部的变量这样的操作,下面还有另外的操作。classMyFilter(num:Int){//过滤掉RDD小于num的数值deffilterInt(rdd:RDD[Int]):RDD[Int]={valn=numrdd.filter(_>n)}}封装算子2这是传递一个方法只是把上面的判断另外写成了一个函数classMyFilter(num:Int){//判断大小返回布尔值defbool(i:Int):Boolean={i>num}//过滤掉RDD小于num的数值deffilterInt(rdd:RDD[Int]):RDD[Int]={rdd.filter(bool(_))}}使用算子valconf=newSparkConf().setMaster("local[*]").setAppName("p")valsc=newSparkContext(conf)valintRDD=sc.makeRDD(List(1,2,3,4,5,6,7,8,9))//实例化封装的方法valmyFilter=newMyFilter(5)valresRDD:RDD[Int]=myFilter.filterInt(intRDD)resRDD.foreach(println)也会报同一个错对象没有序列化-objectnotserializable(class:cn.bigdata.MyFilter,value:cn.bigdata.MyFilter@27f643e)因为装饰者模式的存在,自己封装的操作只需要继承特质Serializable就好,上面的传递变量也可以这样。classMyFilter(num:Int)extendsSerializable{//判断大小返回布尔值defbool(i:Int):Boolean={i>num}//过滤掉RDD小于num的数值deffilterInt(rdd:RDD[Int]):RDD[Int]={rdd.filter(bool(_))}}

哔哔大数据

行动算子行动算子:是把RDD的对象转换成Scala的相关类型。转换算子:就是RDD之间相互转换的算子collect()作用:在驱动程序中,以数组的形式返回RDD的所有元素。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10)转换结果为数组valresRDD:Array[Int]=listRDD.collect()reduce(f:(T,T)=>T)作用:通过函数把数据两两进行聚合,先聚合分区内数据,再聚合分区间数据。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10)转换结果为数组如果是K-V类型,也是两两传入K-V对进行聚合。valres:Int=listRDD.reduce(_+_)println(res)---------------55first()作用:返回RDD中第一个数据take(num:Int)作用:返回该RDD的前n个元素组成的数组创建RDDvallistRDD:RDD[Int]=sc.makeRDD(List(5,4,7,6,1))取前三条valres:Array[Int]=listRDD.take(3)println(res.mkString(","))---------------5,4,7takeOrdered(num:Int)作用:返回该RDD排序后的前n个元素组成的数组创建RDDvallistRDD:RDD[Int]=sc.makeRDD(List(5,4,7,6,1))取前三条valres:Array[Int]=listRDD.takeOrdered(3)println(res.mkString(","))---------------1,4,5count()作用:返回RDD中元素的个数统计vallistRDD:RDD[Int]=sc.makeRDD(List(5,4,7,6,1))valres:Long=listRDD.count()println(res)------------5aggregate(zeroValue:U)(seqOp:(U,T)=>U,combOp:(U,U)=>U)作用:使分区内和分区间可以使用不同的函数聚合。zeroValue:两两聚合时第一次调用的值。seqOp:分区内聚合的函数。combOp:分区间聚合的函数。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10,2)聚合打印这个初始值在每个分区内聚合时会调用一次,还在分局间聚合时会调用一次。valres:Int=listRDD.aggregate(0)(_+_,_+_)println(res)------------55fold(zeroValue:T)(op:(T,T)=>T)作用:是aggregate()的精简版,分区内和分区间进行相同的聚合操作。其实底层还是先聚合分区内,再聚合分区间。zeroValue:两两聚合时第一次调用的值。op:分区内和分区间聚合的函数创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10,2)聚合打印这个初始值在每个分区内聚合时会调用一次,还在分局间聚合时会调用一次,跟aggregate()一样valres:Int=listRDD.fold(0)(_+_)println(res)---------------55文件保存把RDD数据通过文件的方式保存saveAsTextFile(路径):将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本saveAsObjectFile(路径):将数据集中的元素以Hadoopsequencefile的格式保存到指定的目录下,可以是HDFS或者其他Hadoop支持的文件系统。saveAsSequenceFile(路径):用于将RDD中的元素序列化成对象,存储到文件中。countByKey()作用:针对(K,V)类型的RDD,返回一个(K,Int)的Map,表示每一个K对应的元素个数。创建RDDvallistRDD:RDD[String]=sc.makeRDD(List("a","b","c","a"))valkvRDD:RDD[(String,Int)]=listRDD.map((_,1))统计打印valslMap:collection.Map[String,Long]=kvRDD.countByKey()println(slMap)--------------Map(a->2,b->1,c->1)foreach()作用:这个foreach()是对RDD操作,没有返回值。与Scala里的用法一样,只是使用的对象不一样。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)遍历打印listRDD.foreach(println)--------------52314

哔哔大数据

Key-Value类型算子都在org.apache.spark.rdd.PairRDDFunctions里面partitionBy(partitioner:Partitioner)作用:按照RDD的Key进行分区操作,如果原有的分区数和现有的分区数是一致的话就不进行分区,否则会生成ShuffleRDD,即会产生shuffle过程。参数partitioner:分区器,需要导入org.apache.spark.HashPartitioner需求:创建一个4个分区的RDD,对其重新分区延展:自定义分区器(重点)创建RDDvallistRDD:RDD[(String,String)]=sc.makeRDD(List(("aa","a"),("ab","b"),("bc","c"),("bd","d")),4)转换RDDHashPartitioner分区器最终导致数据在哪个分区,有它自己的算法,所以我们不能控制,就需要自定义分区器valpartRDD:RDD[(String,String)]=listRDD.partitionBy(newHashPartitioner(3))结果输出看不出来是把Key进行了什么算法分区的partRDD.saveAsTextFile("outpath")groupByKey()作用:把有相同Key的Value聚合在一起,最后生成生成一个K-V集合创建RDDvallistRDD:RDD[String]=sc.makeRDD(List("Python","Java","Scala","Python"))valkvRDD:RDD[(String,Int)]=listRDD.map((_,1))转换RDDvalresRDD:RDD[(String,Int)]=kvRDD.groupByKey()结果输出valr:Array[(String,Int)]=resRDD.collect()r.foreach(println)------------------(Java,CompactBuffer(1))(Scala,CompactBuffer(1))(Python,CompactBuffer(1,1))reduceByKey(func:(V,V)=>V,[numPartitions:Int])作用:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,第二个是聚合之后的分区数func:(V,V)=>V:有相同k的v,这个v之间的运算聚合创建RDDvallistRDD:RDD[String]=sc.makeRDD(List("Python","Java","Scala","Python"))valkvRDD:RDD[(String,Int)]=listRDD.map((_,1))转换RDDvalresRDD:RDD[(String,Int)]=kvRDD.reduceByKey(_+_)结果输出valr:Array[(String,Int)]=resRDD.collect()r.foreach(println)------------------(Java,1)(Scala,1)(Python,2)aggregateByKey(zeroValue:U)(seqOp:(U,V)=>U,combOp:(U,U)=>U)作用:能先把分区内(seqOp)有相同Key的数据进行聚合,然后再进行分区间(combOp)有相同Key的数据进行聚合。就是能定义分区内和分区间进行不同的聚合方式.zeroValue:是先两两做聚合,所以在取第一个Key做聚合时,需要一个zeroValue(初始值)来两两运算。seqOp:分区内做聚合的函数combOp:分区间做聚合的函数需求:创建一个pairRDD,取出每个分区相同key对应值的最大值,然后相加创建RDDvalkvRDD=sc.makeRDD(List(("a",1),("a",3),("b",5),("b",8),("c",5),("b",7)),2)转换RDDvalresRDD:RDD[(String,Int)]=kvRDD.aggregateByKey(0)(List(_,_).max,_+_)结果输出valr:Array[(String,Int)]=resRDD.collect()println(r.mkString(","))------------------(b,13),(a,3),(c,5)foldByKey(zeroValue:U)(func:(V,V)=>V))作用:是aggregateByKey()的精简版,使分区内和分区间进行相同的聚合操作。其实底层还是先聚合分区内,再聚合分区间。zeroValue:是先两两做聚合,所以在取第一个Key做聚合时,需要一个zeroValue(初始值)来两两运算。func:做聚合的函数需求:创建一个pairRDD,取出不同Key对应pairRDD最大值。创建RDDvalkvRDD=sc.makeRDD(List(("a",1),("a",3),("b",5),("b",8),("c",5),("b",7)),2)转换RDDvalresRDD:RDD[(String,Int)]=kvRDD.foldByKey(0)(List(_,_).max)结果输出valr:Array[(String,Int)]=resRDD.collect()println(r.mkString(","))------------------(b,8),(a,3),(c,5)combineByKey(参数是:三个函数)作用:对相同K,把V合并成一个集合。这个方法是aggregateByKey()的完全版。初始值的大小,格式需要使用函数进行处理。createCombiner:V=>C:对第一个V进行初始化处理,大小,格式等。mergeValue:(C,V):这是分区内的聚合,C的格式是初始化之后的格式,V是两两操作的第二个值mergeCombiners:(C,C)=>C:这是分区间的处理需求:创建一个pairRDD,把相同的Key的Value相加,并记录相同的Key有多少个。创建RDDvalkvRDD=sc.makeRDD(List(("a",11),("a",31),("b",51),("b",81),("c",51),("b",71)),2)转换RDDvalresRDD:RDD[(String,(Int,Int))]=kvRDD.combineByKey(//对第一个V做初始化格式处理,形成元组,为了方便记录次数(_,1),//分区内部两两处理,(a:(Int,Int),v)=>(a._1+v,a._2+1),//分区间,和加和,次数加次数(a:(Int,Int),b:(Int,Int))=>(a._1+b._1,a._2+b._2))结果输出valr:Array[(String,(Int,Int))]=resRDD.collect()println(r.mkString(","))-----------------------(b,(203,3)),(a,(42,2)),(c,(51,1))sortByKey([ascending:Boolean],[numPartitions:Int])作用:通过K-V的K来排序。[ascending:Boolean]:默认升序[numPartitions:Int]:分区数,或者叫任务数创建RDDvalkvRDD=sc.makeRDD(List(("a",1),("a",3),("b",5),("b",8),("c",5),("b",7)),2)转换RDD降序valresRDD:RDD[(String,Int)]=kvRDD.sortByKey(false)结果输出valr:Array[(String,Int)]=resRDD.collect()println(r.mkString(","))------------------(c,5),(b,5),(b,8),(b,7),(a,1),(a,3)mapValues(f:V=>U):RDD[(K,U))作用:传入函数,只对K-V里的V做处理需求:给每个K-V的V加上¥符号。创建RDDvalkvRDD=sc.makeRDD(List(("a",1),("a",3),("b",5),("b",8),("c",5),("b",7)),2)转换RDDvalresRDD:RDD[(String,String)]=kvRDD.mapValues(_+"¥")结果输出valr:Array[(String,String)]=resRDD.collect()println(r.mkString(","))------------------------(a,1¥),(a,3¥),(b,5¥),(b,8¥),(c,5¥),(b,7¥)join(other:RDD[(K,W)])作用:组合。在类型为(K,V)和(K,W)的两个RDD上调用。两个RDD,以相同的K为依据,不同的RDD相互组合V,返回(K,(V,W))形式的RDD。创建RDD创建2个RDDvalkvRDD=sc.makeRDD(List(("a",1),("b",5),("b",8),("c",5),("x",1)),2)valkvRDD1=sc.makeRDD(List(("a",1),("c",2),("b",5)))转换RDDvalresRDD:RDD[(String,(Int,Int))]=kvRDD.join(kvRDD1)结果输出注意输出没有(“x”,1)这一对,说明当只有单个时,不能输出valr:Array[(String,(Int,Int))]=resRDD.collect()println(r.mkString(","))------------------------(a,(1,1)),(b,(5,5)),(b,(8,5)),(c,(5,2))cogroup(other:RDD[(K,W)])作用:组合。这个组合更强大,把两个K-VRDD,通果相同的K把V组合起来,在同一RDD内V在一个迭代类型内,然后再把另外一个RDD组合的迭代类型一起组合成一个大K-V的V。创建RDD创建2个RDDvalkvRDD=sc.makeRDD(List(("a",1),("b",5),("b",8),("c",5),("x",1)))valkvRDD1=sc.makeRDD(List(("a",1),("c",2),("b",5),("b",5)))转换RDDvalresRDD:RDD[(String,(Iterable[Int],Iterable[Int]))]=kvRDD.cogroup(kvRDD1)结果输出注意输出没有(“x”,1)这一对,说明当只有单个时,不能输出valr:Array[(String,(Iterable[Int],Iterable[Int]))]=resRDD.collect()r.foreach(println)------------------------(a,(CompactBuffer(1),CompactBuffer(1)))(b,(CompactBuffer(5,8),CompactBuffer(5,5)))(c,(CompactBuffer(5),CompactBuffer(2)))(x,(CompactBuffer(1),CompactBuffer()))

哔哔大数据

双Value类型双Value类型是指两个RDD之间的操作并集union(other:RDD[T])作用:并集;对源RDD和参数RDD求并集后返回一个新的RDD创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(2to7)转换RDDvalresRDD:RDD[Int]=listRDD.union(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------1,2,3,4,5,2,3,4,5,6,7交集intersection(other:RDD[T],[numPartitions:Int])作用:交集,返回两个RDD中相同的值。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(3to8)转换RDDvalresRDD:RDD[Int]=listRDD.intersection(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------3,4,5差集subtract(other:RDD[T],[numPartitions:Int])作用:差集;返回源RDD中与参数RDD中不同的值。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(3to8)转换RDDvalresRDD:RDD[Int]=listRDD.subtract(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------1,2intersection(other:RDD[T])作用:笛卡尔积(尽量避免使用)创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(3to7)转换RDDvalresRDD:RDD[(Int,Int)]=listRDD.cartesian(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------(1,5),(1,6),(1,7),(2,5),(2,6),(2,7),(3,5),(3,6),(3,7)zip(other:RDD[T])作用:将两个RDD按照元素顺序组合成Key/Value形式的RDD,两个RDD的partition数量以及元素数量都相同,否则会抛出异常。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(3to7)转换RDDvalresRDD:RDD[(Int,Int)]=listRDD.zip(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------(1,5),(2,6),(3,7)

哔哔大数据

前言RDD整体上分为Value类型和Key-Value类型Value类型map(func)作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD创建RDDvalintRDD:RDD[Int]=sc.makeRDD(1to10)转换RDD方法转换重点是每一个数据valresRDD:RDD[Int]=intRDD.map(_*2)结果输出valr=resRDD.collect().mkString(",")println(r)----------2,4,6,8,10,12,14,16,18,20mapPartitions(func)作用:类似于map,但独立地在RDD的每一个分区上运行,因此func的函数类型必须是(Interator[T])=>Iterator[U]。效率要优于map()但分区数据太大容易造成内存溢出的错误(OOM)需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD创建RDDvalintRDD:RDD[Int]=sc.makeRDD(1to10)转换RDD方法转换重点是每一个分区,所以匿名函数``传入是可迭代类型,返回也要可迭代类型valresRDD:RDD[Int]=intRDD.mapPartitions((iter)=>{//这里的map()是Scala中的函数iter.map(i=>i*2)})结果输出valr=resRDD.collect().mkString(",")println(r)----------2,4,6,8,10,12,14,16,18,20mapPartitionsWithIndex(func)作用:类似于mapPartitions,但func带有一个整数参数表示所在分区的索引值,因此func的函数类型必须是(Int,Interator[T])=>Iterator[U];需求:创建一个RDD,使元素跟所在分区索引形成一个元组组成一个新的RDD创建RDDvalintRDD:RDD[Int]=sc.makeRDD(1to10)转换RDD方法转换重点是每一个分区valresRDD:RDD[(String,Int)]=intRDD.mapPartitionsWithIndex{//有多个传入参数时,一般使用casecase(num,datas)=>{datas.map(("分区"+num,_))}}结果输出valr=resRDD.collect().mkString(",")println(r)----------(分区1,1),(分区3,2),(分区4,3),(分区6,4),(分区7,5),(分区9,6),(分区11,7),(分区12,8),(分区14,9),(分区15,10)flatMap(func)作用:类似于Scala中的flatMap(),每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)需求:创建一个Array数组中的List元素RDD,映射成Array数组RDD创建RDDvallistRDD:RDD[List[Int]]=sc.makeRDD(Array(List(1,2),List(3,4)))转换RDDvalresRDD:RDD[Int]=listRDD.flatMap(list=>list)结果输出valr=resRDD.collect().mkString(",")println(r)----------1,2,3,4glom()作用:将每一个分区形成一个数组,形成新的RDD类型是RDD[Array[T]]创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10,3)转换RDDvalarrayRDD:RDD[Array[Int]]=listRDD.glom()结果输出分区不能平均存放数据时,多余的数据会往后面的分区存放valres=arrayRDD.collect()res.foreach(nums=>println(nums.mkString(",")))------------------------------------------------1,2,34,5,67,8,9,10groupBy()作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器,返回一个元组:RDD[(Int,Iterable[Int])],前一个Int是传入函数返回值。需求:区分奇偶数创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10)转换RDDvalresRDD:RDD[(Int,Iterable[Int])]=listRDD.groupBy(_%2)结果输出valr=resRDD.collect()r.foreach(println)------------------(0,CompactBuffer(2,4,6,8,10))(1,CompactBuffer(1,3,5,7,9))filter(func)作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。需求:过滤奇数,留下偶数创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10)转换RDDvalresRDD:RDD[Int]=listRDD.filter(_%2==0)结果输出valr=resRDD.collect()println(r.mkString(","))------------------2,4,6,8,10sample(withReplacement,fraction,[seed])作用:数据抽样withReplacement:布尔值true:有放回的抽样,采用PoissonSampler抽样器(Poisson分布),false:无放回的抽样,采用BernoulliSampler的抽样器fraction:当为true时,这就是数学中的期望当为false时,这就是概率,值的范围[0,1]seed:随机数种子,不好把握,保持默认就好需求:创建一个RDD(1-10),从中选择不放回抽样创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10)转换RDDvalresRDD:RDD[Int]=listRDD.sample(false,0.4)结果输出其实每次抽样结果的值和值的个数的个数都不一样,但会保持在一个数学算法的范围里valr=resRDD.collect()println(r.mkString(","))------------------------5,7,9,10distinct([numPartitions])作用:去重。因为去重之后,数据会减少,各个数据原本所在的分区会被洗牌打乱,所以其中就有shuffle操作,参数是去重后所产生的分区数量,可选。需求:去重,指定两个分区。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(List(6,2,1,2,8,3,5,5,4,1,2,2))转换RDDvalresRDD:RDD[Int]=listRDD.distinct(2)结果输出如果不指定分区,输出的数据会自动排序valr=resRDD.collect()println(r.mkString(","))-----------------------4,6,8,2,1,3,5coalesce(numPartitions,[shuffle])作用:减少分区数。用于大数据集过滤后,提高小数据集的执行效率。参数如果大于原本的分区数,则分区数不变,默认false不进行shuffle操作。需求:减少分区数。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10,6)转换RDDvalnewPartition:RDD[Int]=listRDD.coalesce(3)结果输出如果不指定分区,输出的数据会自动排序println("原本的分区数:"+listRDD.partitions.size)println("新的分区数:"+newPartition.partitions.size)---------------------------------------------------原本的分区数:6新的分区数:3repartition(numPartitions:Int)作用:根据分区数,重新通过网络随机洗牌所有数据,分区数可以比原来多也可以少,一定会进行shuffle操作。通过源码发现,其实在底层也调用了coalesce()方法,并且shuffle=truedefrepartition(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[T]=withScope{coalesce(numPartitions,shuffle=true)}sortBy(func,[ascending],[numPartitions])作用:使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为升序。func:处理函数ascending:默认true升序,false降序numPartitions:处理后的分区数需求创建一个RDD,按照不同的规则,进行讲叙排序,并指定3个分区创建RDDvallistRDD:RDD[Int]=sc.makeRDD(List(5,8,2,4,6,9,1),8)转换RDDvalresRDD:RDD[Int]=listRDD.sortBy(x=>x,false,3)结果输出如果不指定分区,输出的数据会自动排序println("排序前分区数:"+listRDD.partitions.length)println("排序后分区数:"+resRDD.partitions.length)valr=resRDD.collect()println(r.mkString(","))-----------------------4,6,8,2,1,3,5排序前分区数:8排序后分区数:39,8,6,5,4,2,1

哔哔大数据

什么是RDDRDD(ResilientDistributedDataset)叫做分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD属性一组分区(Partition),即数据集的基本组成单位;一个计算每个分区的函数;RDD之间的依赖关系;一个Partitioner,即RDD的分片函数;一个列表,存储存取每个Partition的优先位置(preferredlocation)。RDD特点RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。RDD分区分区详情查看RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。RDD的创建RDD的创建又三种方式:从集合中创建RDD;从外部存储创建RDD;从其他RDD创建。创建RDD,首先要构建Spakr的上下文对象//创建Spark上下文对像valconf:SparkConf=newSparkConf().setMaster("local[*]").setAppName("mkRdd")valsc:SparkContext=newSparkContext(config=conf)集合中创建这种方式又叫从内存创建,Spark主要提供了两种函数:parallelize和makeRDD,查看源码会发现makeRDD底层把数据传给了parallelize,所以这两个是差不多参数传入有序,可重复集合valstrRDD:RDD[String]=sc.makeRDD(Array("1","2","3","4"))valintRDD:RDD[Int]=sc.parallelize(List(1,2,3,4))外部创建路径支持本地文件、HDFS路径,和支持HDFS文件的URL,比如HBasevalfileRDD:RDD[String]=sc.textFile("路径")其他RDD创建RDD使用算子计算后返回的还是RDD

2020-2-13 389 0
哔哔大数据

前言RDD的分区概念在RDD概述里已经说过,这里就不在复述分区分区数越多,任务数越多,执行效率就高makeRDD()分区的体现使用数组创建RDD,直接保存,我们发现数组里只有5的元素,但却输出的8个文件。查看makeRDD()的源码,发现还有第二个参数numSlices这个第二个参数,就是指定切为多少分区。defmakeRDD[T:ClassTag](seq:Seq[T],//注意这里是numSlices,因为textFile()方法有点不一样numSlices:Int=defaultParallelism):RDD[T]=withScope{parallelize(seq,numSlices)}它的默认值(defaultParallelism)是会去判定你是否设置(conf),否者就去获取全部的CPU个数来进行默认设置。overridedefdefaultParallelism():Int={conf.getInt("spark.default.parallelism",math.max(totalCoreCount.get(),2))}textFile()分区的体现同样的保存只产生了两个查看textFile()的源码,第二个参数是minPartitions(最小的)其中调用了hadoopFile(),所以Spark对文件的切分规则跟Hadoop是一样的,(所以就会出现一个指定分区为2,但实际却有3的情况,因为分2个,还会剩下,所以就会产生大于2个)。所以当自定义了分区数,实际情况可能不是这个数deftextFile(path:String,minPartitions:Int=defaultMinPartitions):RDD[String]=withScope{assertNotStopped()hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],classOf[Text],minPartitions).map(pair=>pair._2.toString).setName(path)}继续向下看源码,发现首先获取了配置里参数,然后与2作比较,取小的作为默认参数defdefaultMinPartitions:Int=math.min(defaultParallelism,2)

2020-2-13 275 0