未分类

行动算子行动算子:是把RDD的对象转换成Scala的相关类型。转换算子:就是RDD之间相互转换的算子collect()作用:在驱动程序中,以数组的形式返回RDD的所有元素。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10)转换结果为数组valresRDD:Array[Int]=listRDD.collect()reduce(f:(T,T)=>T)作用:通过函数把数据两两进行聚合,先聚合分区内数据,再聚合分区间数据。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10)转换结果为数组如果是K-V类型,也是两两传入K-V对进行聚合。valres:Int=listRDD.reduce(_+_)println(res)---------------55first()作用:返回RDD中第一个数据take(num:Int)作用:返回该RDD的前n个元素组成的数组创建RDDvallistRDD:RDD[Int]=sc.makeRDD(List(5,4,7,6,1))取前三条valres:Array[Int]=listRDD.take(3)println(res.mkString(","))---------------5,4,7takeOrdered(num:Int)作用:返回该RDD排序后的前n个元素组成的数组创建RDDvallistRDD:RDD[Int]=sc.makeRDD(List(5,4,7,6,1))取前三条valres:Array[Int]=listRDD.takeOrdered(3)println(res.mkString(","))---------------1,4,5count()作用:返回RDD中元素的个数统计vallistRDD:RDD[Int]=sc.makeRDD(List(5,4,7,6,1))valres:Long=listRDD.count()println(res)------------5aggregate(zeroValue:U)(seqOp:(U,T)=>U,combOp:(U,U)=>U)作用:使分区内和分区间可以使用不同的函数聚合。zeroValue:两两聚合时第一次调用的值。seqOp:分区内聚合的函数。combOp:分区间聚合的函数。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10,2)聚合打印这个初始值在每个分区内聚合时会调用一次,还在分局间聚合时会调用一次。valres:Int=listRDD.aggregate(0)(_+_,_+_)println(res)------------55fold(zeroValue:T)(op:(T,T)=>T)作用:是aggregate()的精简版,分区内和分区间进行相同的聚合操作。其实底层还是先聚合分区内,再聚合分区间。zeroValue:两两聚合时第一次调用的值。op:分区内和分区间聚合的函数创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10,2)聚合打印这个初始值在每个分区内聚合时会调用一次,还在分局间聚合时会调用一次,跟aggregate()一样valres:Int=listRDD.fold(0)(_+_)println(res)---------------55文件保存把RDD数据通过文件的方式保存saveAsTextFile(路径):将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本saveAsObjectFile(路径):将数据集中的元素以Hadoopsequencefile的格式保存到指定的目录下,可以是HDFS或者其他Hadoop支持的文件系统。saveAsSequenceFile(路径):用于将RDD中的元素序列化成对象,存储到文件中。countByKey()作用:针对(K,V)类型的RDD,返回一个(K,Int)的Map,表示每一个K对应的元素个数。创建RDDvallistRDD:RDD[String]=sc.makeRDD(List("a","b","c","a"))valkvRDD:RDD[(String,Int)]=listRDD.map((_,1))统计打印valslMap:collection.Map[String,Long]=kvRDD.countByKey()println(slMap)--------------Map(a->2,b->1,c->1)foreach()作用:这个foreach()是对RDD操作,没有返回值。与Scala里的用法一样,只是使用的对象不一样。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)遍历打印listRDD.foreach(println)--------------52314

未分类

Key-Value类型算子都在org.apache.spark.rdd.PairRDDFunctions里面partitionBy(partitioner:Partitioner)作用:按照RDD的Key进行分区操作,如果原有的分区数和现有的分区数是一致的话就不进行分区,否则会生成ShuffleRDD,即会产生shuffle过程。参数partitioner:分区器,需要导入org.apache.spark.HashPartitioner需求:创建一个4个分区的RDD,对其重新分区延展:自定义分区器(重点)创建RDDvallistRDD:RDD[(String,String)]=sc.makeRDD(List(("aa","a"),("ab","b"),("bc","c"),("bd","d")),4)转换RDDHashPartitioner分区器最终导致数据在哪个分区,有它自己的算法,所以我们不能控制,就需要自定义分区器valpartRDD:RDD[(String,String)]=listRDD.partitionBy(newHashPartitioner(3))结果输出看不出来是把Key进行了什么算法分区的partRDD.saveAsTextFile("outpath")groupByKey()作用:把有相同Key的Value聚合在一起,最后生成生成一个K-V集合创建RDDvallistRDD:RDD[String]=sc.makeRDD(List("Python","Java","Scala","Python"))valkvRDD:RDD[(String,Int)]=listRDD.map((_,1))转换RDDvalresRDD:RDD[(String,Int)]=kvRDD.groupByKey()结果输出valr:Array[(String,Int)]=resRDD.collect()r.foreach(println)------------------(Java,CompactBuffer(1))(Scala,CompactBuffer(1))(Python,CompactBuffer(1,1))reduceByKey(func:(V,V)=>V,[numPartitions:Int])作用:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,第二个是聚合之后的分区数func:(V,V)=>V:有相同k的v,这个v之间的运算聚合创建RDDvallistRDD:RDD[String]=sc.makeRDD(List("Python","Java","Scala","Python"))valkvRDD:RDD[(String,Int)]=listRDD.map((_,1))转换RDDvalresRDD:RDD[(String,Int)]=kvRDD.reduceByKey(_+_)结果输出valr:Array[(String,Int)]=resRDD.collect()r.foreach(println)------------------(Java,1)(Scala,1)(Python,2)aggregateByKey(zeroValue:U)(seqOp:(U,V)=>U,combOp:(U,U)=>U)作用:能先把分区内(seqOp)有相同Key的数据进行聚合,然后再进行分区间(combOp)有相同Key的数据进行聚合。就是能定义分区内和分区间进行不同的聚合方式.zeroValue:是先两两做聚合,所以在取第一个Key做聚合时,需要一个zeroValue(初始值)来两两运算。seqOp:分区内做聚合的函数combOp:分区间做聚合的函数需求:创建一个pairRDD,取出每个分区相同key对应值的最大值,然后相加创建RDDvalkvRDD=sc.makeRDD(List(("a",1),("a",3),("b",5),("b",8),("c",5),("b",7)),2)转换RDDvalresRDD:RDD[(String,Int)]=kvRDD.aggregateByKey(0)(List(_,_).max,_+_)结果输出valr:Array[(String,Int)]=resRDD.collect()println(r.mkString(","))------------------(b,13),(a,3),(c,5)foldByKey(zeroValue:U)(func:(V,V)=>V))作用:是aggregateByKey()的精简版,使分区内和分区间进行相同的聚合操作。其实底层还是先聚合分区内,再聚合分区间。zeroValue:是先两两做聚合,所以在取第一个Key做聚合时,需要一个zeroValue(初始值)来两两运算。func:做聚合的函数需求:创建一个pairRDD,取出不同Key对应pairRDD最大值。创建RDDvalkvRDD=sc.makeRDD(List(("a",1),("a",3),("b",5),("b",8),("c",5),("b",7)),2)转换RDDvalresRDD:RDD[(String,Int)]=kvRDD.foldByKey(0)(List(_,_).max)结果输出valr:Array[(String,Int)]=resRDD.collect()println(r.mkString(","))------------------(b,8),(a,3),(c,5)combineByKey(参数是:三个函数)作用:对相同K,把V合并成一个集合。这个方法是aggregateByKey()的完全版。初始值的大小,格式需要使用函数进行处理。createCombiner:V=>C:对第一个V进行初始化处理,大小,格式等。mergeValue:(C,V):这是分区内的聚合,C的格式是初始化之后的格式,V是两两操作的第二个值mergeCombiners:(C,C)=>C:这是分区间的处理需求:创建一个pairRDD,把相同的Key的Value相加,并记录相同的Key有多少个。创建RDDvalkvRDD=sc.makeRDD(List(("a",11),("a",31),("b",51),("b",81),("c",51),("b",71)),2)转换RDDvalresRDD:RDD[(String,(Int,Int))]=kvRDD.combineByKey(//对第一个V做初始化格式处理,形成元组,为了方便记录次数(_,1),//分区内部两两处理,(a:(Int,Int),v)=>(a._1+v,a._2+1),//分区间,和加和,次数加次数(a:(Int,Int),b:(Int,Int))=>(a._1+b._1,a._2+b._2))结果输出valr:Array[(String,(Int,Int))]=resRDD.collect()println(r.mkString(","))-----------------------(b,(203,3)),(a,(42,2)),(c,(51,1))sortByKey([ascending:Boolean],[numPartitions:Int])作用:通过K-V的K来排序。[ascending:Boolean]:默认升序[numPartitions:Int]:分区数,或者叫任务数创建RDDvalkvRDD=sc.makeRDD(List(("a",1),("a",3),("b",5),("b",8),("c",5),("b",7)),2)转换RDD降序valresRDD:RDD[(String,Int)]=kvRDD.sortByKey(false)结果输出valr:Array[(String,Int)]=resRDD.collect()println(r.mkString(","))------------------(c,5),(b,5),(b,8),(b,7),(a,1),(a,3)mapValues(f:V=>U):RDD[(K,U))作用:传入函数,只对K-V里的V做处理需求:给每个K-V的V加上¥符号。创建RDDvalkvRDD=sc.makeRDD(List(("a",1),("a",3),("b",5),("b",8),("c",5),("b",7)),2)转换RDDvalresRDD:RDD[(String,String)]=kvRDD.mapValues(_+"¥")结果输出valr:Array[(String,String)]=resRDD.collect()println(r.mkString(","))------------------------(a,1¥),(a,3¥),(b,5¥),(b,8¥),(c,5¥),(b,7¥)join(other:RDD[(K,W)])作用:组合。在类型为(K,V)和(K,W)的两个RDD上调用。两个RDD,以相同的K为依据,不同的RDD相互组合V,返回(K,(V,W))形式的RDD。创建RDD创建2个RDDvalkvRDD=sc.makeRDD(List(("a",1),("b",5),("b",8),("c",5),("x",1)),2)valkvRDD1=sc.makeRDD(List(("a",1),("c",2),("b",5)))转换RDDvalresRDD:RDD[(String,(Int,Int))]=kvRDD.join(kvRDD1)结果输出注意输出没有(“x”,1)这一对,说明当只有单个时,不能输出valr:Array[(String,(Int,Int))]=resRDD.collect()println(r.mkString(","))------------------------(a,(1,1)),(b,(5,5)),(b,(8,5)),(c,(5,2))cogroup(other:RDD[(K,W)])作用:组合。这个组合更强大,把两个K-VRDD,通果相同的K把V组合起来,在同一RDD内V在一个迭代类型内,然后再把另外一个RDD组合的迭代类型一起组合成一个大K-V的V。创建RDD创建2个RDDvalkvRDD=sc.makeRDD(List(("a",1),("b",5),("b",8),("c",5),("x",1)))valkvRDD1=sc.makeRDD(List(("a",1),("c",2),("b",5),("b",5)))转换RDDvalresRDD:RDD[(String,(Iterable[Int],Iterable[Int]))]=kvRDD.cogroup(kvRDD1)结果输出注意输出没有(“x”,1)这一对,说明当只有单个时,不能输出valr:Array[(String,(Iterable[Int],Iterable[Int]))]=resRDD.collect()r.foreach(println)------------------------(a,(CompactBuffer(1),CompactBuffer(1)))(b,(CompactBuffer(5,8),CompactBuffer(5,5)))(c,(CompactBuffer(5),CompactBuffer(2)))(x,(CompactBuffer(1),CompactBuffer()))

未分类

双Value类型双Value类型是指两个RDD之间的操作并集union(other:RDD[T])作用:并集;对源RDD和参数RDD求并集后返回一个新的RDD创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(2to7)转换RDDvalresRDD:RDD[Int]=listRDD.union(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------1,2,3,4,5,2,3,4,5,6,7交集intersection(other:RDD[T],[numPartitions:Int])作用:交集,返回两个RDD中相同的值。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(3to8)转换RDDvalresRDD:RDD[Int]=listRDD.intersection(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------3,4,5差集subtract(other:RDD[T],[numPartitions:Int])作用:差集;返回源RDD中与参数RDD中不同的值。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(3to8)转换RDDvalresRDD:RDD[Int]=listRDD.subtract(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------1,2intersection(other:RDD[T])作用:笛卡尔积(尽量避免使用)创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(3to7)转换RDDvalresRDD:RDD[(Int,Int)]=listRDD.cartesian(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------(1,5),(1,6),(1,7),(2,5),(2,6),(2,7),(3,5),(3,6),(3,7)zip(other:RDD[T])作用:将两个RDD按照元素顺序组合成Key/Value形式的RDD,两个RDD的partition数量以及元素数量都相同,否则会抛出异常。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to5)vallistRDD1:RDD[Int]=sc.makeRDD(3to7)转换RDDvalresRDD:RDD[(Int,Int)]=listRDD.zip(listRDD1)结果输出valr=resRDD.collect()println(r.mkString(","))------------------------(1,5),(2,6),(3,7)

未分类

自定义分区器在使用partitionBy()函时我们需要传一个分区器。在有些场景下,我们需要指定某些数据放在一个分区,这时自带的分区器不能满足我们的需求。源码分析org.apache.spark.HashPartitioner分区器classHashPartitioner(partitions:Int)extendsPartitioner{require(partitions>=0,s"Numberofpartitions($partitions)cannotbenegative.")defnumPartitions:Int=partitionsdefgetPartition(key:Any):Int=keymatch{casenull=>0case_=>Utils.nonNegativeMod(key.hashCode,numPartitions)}overridedefequals(other:Any):Boolean=othermatch{caseh:HashPartitioner=>h.numPartitions==numPartitionscase_=>false}overridedefhashCode:Int=numPartitions}我们看见这个类继承了Partitioner(),继续查看源码abstractclassPartitionerextendsSerializable{//创建出来的分区数。defnumPartitions:Int//返回给定键的分区编号defgetPartition(key:Any):Int}发现这是一个抽象类,里面只有两个方法,我们就可以对比着HashPartitioner类来看numPartitions():Int:分区数,这个方法没有做处理,就是传进来的值getPartition(key:Any):Int:传入的参数是Key,所以就判断这个Key,返回一个Int来确定数据放在哪个分区。重写Partitioner类把下面这个RDD,把包含a的放在1分区,b放在2分区,其他的放在0分区vallistRDD:RDD[(String,String)]=sc.makeRDD(List(("aa","a"),("ab","b"),("bc","c"),("bd","d"),("dd","d")))自定义分区classMyPart(partitions:Int)extendsPartitioner{//包含a的1分区//包含b的2分区//其他的0分区//传入的参数至少大于三overridedefnumPartitions:Int=partitionsoverridedefgetPartition(key:Any):Int={//这里注意改变类型,然后才能使用响应的方法if(key.toString.indexOf("a")!=-1){1}elseif(key.toString.indexOf("b")!=-1){2}else{0}}使用自定义的分区器valpartRDD:RDD[(String,String)]=listRDD.partitionBy(newMyPart(3))查看分区结果partRDD.saveAsTextFile("outpath")

未分类

前言RDD整体上分为Value类型和Key-Value类型Value类型map(func)作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD创建RDDvalintRDD:RDD[Int]=sc.makeRDD(1to10)转换RDD方法转换重点是每一个数据valresRDD:RDD[Int]=intRDD.map(_*2)结果输出valr=resRDD.collect().mkString(",")println(r)----------2,4,6,8,10,12,14,16,18,20mapPartitions(func)作用:类似于map,但独立地在RDD的每一个分区上运行,因此func的函数类型必须是(Interator[T])=>Iterator[U]。效率要优于map()但分区数据太大容易造成内存溢出的错误(OOM)需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD创建RDDvalintRDD:RDD[Int]=sc.makeRDD(1to10)转换RDD方法转换重点是每一个分区,所以匿名函数``传入是可迭代类型,返回也要可迭代类型valresRDD:RDD[Int]=intRDD.mapPartitions((iter)=>{//这里的map()是Scala中的函数iter.map(i=>i*2)})结果输出valr=resRDD.collect().mkString(",")println(r)----------2,4,6,8,10,12,14,16,18,20mapPartitionsWithIndex(func)作用:类似于mapPartitions,但func带有一个整数参数表示所在分区的索引值,因此func的函数类型必须是(Int,Interator[T])=>Iterator[U];需求:创建一个RDD,使元素跟所在分区索引形成一个元组组成一个新的RDD创建RDDvalintRDD:RDD[Int]=sc.makeRDD(1to10)转换RDD方法转换重点是每一个分区valresRDD:RDD[(String,Int)]=intRDD.mapPartitionsWithIndex{//有多个传入参数时,一般使用casecase(num,datas)=>{datas.map(("分区"+num,_))}}结果输出valr=resRDD.collect().mkString(",")println(r)----------(分区1,1),(分区3,2),(分区4,3),(分区6,4),(分区7,5),(分区9,6),(分区11,7),(分区12,8),(分区14,9),(分区15,10)flatMap(func)作用:类似于Scala中的flatMap(),每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)需求:创建一个Array数组中的List元素RDD,映射成Array数组RDD创建RDDvallistRDD:RDD[List[Int]]=sc.makeRDD(Array(List(1,2),List(3,4)))转换RDDvalresRDD:RDD[Int]=listRDD.flatMap(list=>list)结果输出valr=resRDD.collect().mkString(",")println(r)----------1,2,3,4glom()作用:将每一个分区形成一个数组,形成新的RDD类型是RDD[Array[T]]创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10,3)转换RDDvalarrayRDD:RDD[Array[Int]]=listRDD.glom()结果输出分区不能平均存放数据时,多余的数据会往后面的分区存放valres=arrayRDD.collect()res.foreach(nums=>println(nums.mkString(",")))------------------------------------------------1,2,34,5,67,8,9,10groupBy()作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器,返回一个元组:RDD[(Int,Iterable[Int])],前一个Int是传入函数返回值。需求:区分奇偶数创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10)转换RDDvalresRDD:RDD[(Int,Iterable[Int])]=listRDD.groupBy(_%2)结果输出valr=resRDD.collect()r.foreach(println)------------------(0,CompactBuffer(2,4,6,8,10))(1,CompactBuffer(1,3,5,7,9))filter(func)作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。需求:过滤奇数,留下偶数创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10)转换RDDvalresRDD:RDD[Int]=listRDD.filter(_%2==0)结果输出valr=resRDD.collect()println(r.mkString(","))------------------2,4,6,8,10sample(withReplacement,fraction,[seed])作用:数据抽样withReplacement:布尔值true:有放回的抽样,采用PoissonSampler抽样器(Poisson分布),false:无放回的抽样,采用BernoulliSampler的抽样器fraction:当为true时,这就是数学中的期望当为false时,这就是概率,值的范围[0,1]seed:随机数种子,不好把握,保持默认就好需求:创建一个RDD(1-10),从中选择不放回抽样创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10)转换RDDvalresRDD:RDD[Int]=listRDD.sample(false,0.4)结果输出其实每次抽样结果的值和值的个数的个数都不一样,但会保持在一个数学算法的范围里valr=resRDD.collect()println(r.mkString(","))------------------------5,7,9,10distinct([numPartitions])作用:去重。因为去重之后,数据会减少,各个数据原本所在的分区会被洗牌打乱,所以其中就有shuffle操作,参数是去重后所产生的分区数量,可选。需求:去重,指定两个分区。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(List(6,2,1,2,8,3,5,5,4,1,2,2))转换RDDvalresRDD:RDD[Int]=listRDD.distinct(2)结果输出如果不指定分区,输出的数据会自动排序valr=resRDD.collect()println(r.mkString(","))-----------------------4,6,8,2,1,3,5coalesce(numPartitions,[shuffle])作用:减少分区数。用于大数据集过滤后,提高小数据集的执行效率。参数如果大于原本的分区数,则分区数不变,默认false不进行shuffle操作。需求:减少分区数。创建RDDvallistRDD:RDD[Int]=sc.makeRDD(1to10,6)转换RDDvalnewPartition:RDD[Int]=listRDD.coalesce(3)结果输出如果不指定分区,输出的数据会自动排序println("原本的分区数:"+listRDD.partitions.size)println("新的分区数:"+newPartition.partitions.size)---------------------------------------------------原本的分区数:6新的分区数:3repartition(numPartitions:Int)作用:根据分区数,重新通过网络随机洗牌所有数据,分区数可以比原来多也可以少,一定会进行shuffle操作。通过源码发现,其实在底层也调用了coalesce()方法,并且shuffle=truedefrepartition(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[T]=withScope{coalesce(numPartitions,shuffle=true)}sortBy(func,[ascending],[numPartitions])作用:使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为升序。func:处理函数ascending:默认true升序,false降序numPartitions:处理后的分区数需求创建一个RDD,按照不同的规则,进行讲叙排序,并指定3个分区创建RDDvallistRDD:RDD[Int]=sc.makeRDD(List(5,8,2,4,6,9,1),8)转换RDDvalresRDD:RDD[Int]=listRDD.sortBy(x=>x,false,3)结果输出如果不指定分区,输出的数据会自动排序println("排序前分区数:"+listRDD.partitions.length)println("排序后分区数:"+resRDD.partitions.length)valr=resRDD.collect()println(r.mkString(","))-----------------------4,6,8,2,1,3,5排序前分区数:8排序后分区数:39,8,6,5,4,2,1

未分类

什么是RDDRDD(ResilientDistributedDataset)叫做分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD属性一组分区(Partition),即数据集的基本组成单位;一个计算每个分区的函数;RDD之间的依赖关系;一个Partitioner,即RDD的分片函数;一个列表,存储存取每个Partition的优先位置(preferredlocation)。RDD特点RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。RDD分区分区详情查看RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。RDD的创建RDD的创建又三种方式:从集合中创建RDD;从外部存储创建RDD;从其他RDD创建。创建RDD,首先要构建Spakr的上下文对象//创建Spark上下文对像valconf:SparkConf=newSparkConf().setMaster("local[*]").setAppName("mkRdd")valsc:SparkContext=newSparkContext(config=conf)集合中创建这种方式又叫从内存创建,Spark主要提供了两种函数:parallelize和makeRDD,查看源码会发现makeRDD底层把数据传给了parallelize,所以这两个是差不多参数传入有序,可重复集合valstrRDD:RDD[String]=sc.makeRDD(Array("1","2","3","4"))valintRDD:RDD[Int]=sc.parallelize(List(1,2,3,4))外部创建路径支持本地文件、HDFS路径,和支持HDFS文件的URL,比如HBasevalfileRDD:RDD[String]=sc.textFile("路径")其他RDD创建RDD使用算子计算后返回的还是RDD

2020-2-13 953 0
未分类

前言RDD的分区概念在RDD概述里已经说过,这里就不在复述分区分区数越多,任务数越多,执行效率就高makeRDD()分区的体现使用数组创建RDD,直接保存,我们发现数组里只有5的元素,但却输出的8个文件。查看makeRDD()的源码,发现还有第二个参数numSlices这个第二个参数,就是指定切为多少分区。defmakeRDD[T:ClassTag](seq:Seq[T],//注意这里是numSlices,因为textFile()方法有点不一样numSlices:Int=defaultParallelism):RDD[T]=withScope{parallelize(seq,numSlices)}它的默认值(defaultParallelism)是会去判定你是否设置(conf),否者就去获取全部的CPU个数来进行默认设置。overridedefdefaultParallelism():Int={conf.getInt("spark.default.parallelism",math.max(totalCoreCount.get(),2))}textFile()分区的体现同样的保存只产生了两个查看textFile()的源码,第二个参数是minPartitions(最小的)其中调用了hadoopFile(),所以Spark对文件的切分规则跟Hadoop是一样的,(所以就会出现一个指定分区为2,但实际却有3的情况,因为分2个,还会剩下,所以就会产生大于2个)。所以当自定义了分区数,实际情况可能不是这个数deftextFile(path:String,minPartitions:Int=defaultMinPartitions):RDD[String]=withScope{assertNotStopped()hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],classOf[Text],minPartitions).map(pair=>pair._2.toString).setName(path)}继续向下看源码,发现首先获取了配置里参数,然后与2作比较,取小的作为默认参数defdefaultMinPartitions:Int=math.min(defaultParallelism,2)

2020-2-13 782 0
未分类

概述Spark的三种运行方式Local模式:直接解压,就可以运行Yarn模式:需要配置HadoopYarnStandalone模式:Spark独立运行模式Spark安装机器数需启动的进程所属者Local1无SparkStandalone多台Master及WorkerSparkYarn1Yarn及HDFSHadoop配置构建一个由Master+n个Slave构成的Spark集群,Spark运行在集群中。配置Worker运行节点在Spakr的conf/复制模板cpslaves.templateslavesslaves文件是指定Worker运行的机器//如果只有一台机器,这个文件就不用改//如果是配置集群模式,就把原本的localhost改成各个机器名masterslave1slave2配置Master运行端口在Spark的conf/目录下复制模板cpspark-env.sh.templatespark-env.sh修改Spark-env.sh//添加上SPARK_MASTER_HOST=masterSPARK_MASTER_PORT=7077//如果产生了notsetJAVA_HOME的错误,就加上JAVA_HOMEexportJAVA_HOME=/usr/java/jdk1.8.0_221//修改完记得source分发配置的单机器就不用分发启动启动命令在sbin/目录下[root@mastersbin]#./start-all.shstartingorg.apache.spark.deploy.master.Master,loggingto/usr/local/src/spark/spark-2.0.0/logs/spark-root-org.apache.spark.deploy.master.Master-1-master.outlocalhost:startingorg.apache.spark.deploy.worker.Worker,loggingto/usr/local/src/spark/spark-2.0.0/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-master.out[root@mastersbin]#jps5863Worker5927Jps5688Master运行测试bin/spark-submit\--classorg.apache.spark.examples.SparkPi\./examples/jars/spark-examples_2.11-2.0.0.jar\100

未分类

概述Spark可以直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出yarn-cluster(集群):Driver程序运行在RM(ResourceManager)启动的AP(APPmaster)适用于生成环境。配置修改hadoop配置文件yarn-site.xml,添加如下内容<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true--><property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value></property><!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true--><property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value></property>修改Spark/conf下的spark-env.sh,添加Hadoop的etc/hadoop,修改完记得sourceYARN_CONF_DIR=/usr/local/src/hadoop/hadoop-2.6.0/etc/hadoop配置集群就分发,不是集群就不用分发//分发spark-env.shscpspark-env.shslave1:/usr/local/src/spark/spark-2.0.0-bin-hadoop2.6/conf/scpspark-env.shslave2:/usr/local/src/spark/spark-2.0.0-bin-hadoop2.6/conf///分发yarn-site.xmlscpyarn-site.xmlslave1:/usr/local/src/hadoop/hadoop-2.6.0/etc/hadoop/scpyarn-site.xmlslave2:/usr/local/src/hadoop/hadoop-2.6.0/etc/hadoop/提交一个例子测试(注意启动Hadoop和Yarn)bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modeclient\./examples/jars/spark-examples_2.11-2.0.0.jar\100日志查看因为在Spark计算,Yarn调度资源,Yarn不能获取到Spark的执行日志,所以需要配置一下修改配置文件Spark/conf下的spark-defaults.conf如下首先创建好存放目录hadoopfs-mkdir/historyserverforSparkpark.yarn.historyServer.address=master:18080spark.history.ui.port=18080spark.eventLog.enabled=truespark.eventLog.dir=hdfs://master:9000/historyserverforSparkspark.history.fs.logDirectory=hdfs://master:9000/historyserverforSpark重启spark历史服务//关闭sbin/stop-history-server.sh//开启sbin/start-history-server.sh提交任务到Yarn执行bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modeclient\./examples/jars/spark-examples_2.11-2.0.0.jar\100Web页面查看日志

2020-2-13 772 0
未分类

搭建前的准备搭建Scala环境首先需要你的IDEA需要搭建好Scala的环境(可以参考这篇)还需要一个和Hadoop的Windows版二进制文件下载地址:http://public-repo-1.hortonworks.com/hdp-win-alpha/winutils.exe搭建配置winutils创建一个目录E:\winutils\bin把winutils.exe放进去配置环境变量//新建变量变量名:HADOOP_HOME变量值:E:\winutils//把新建变量加入Path%HADOOP%\bin这样就安装好了配置IDEA在IDEA配置好Scala后,就只需要在pom.xml文件里添加Spark依赖就好pom.xml文件<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.bigdataboy</groupId><artifactId>SparkDemo</artifactId><version>1.0</version><dependencies><!--Spark依赖--><!--https://mvnrepository.com/artifact/org.apache.spark/spark-core--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.0.0</version></dependency></dependencies></project>等待依赖下载完成Spark案例第一个Spark案例,单词统计统计数据helloScalahelloJavahelloPython统计代码defmain(args:Array[String]):Unit={//设定Spark运行环境valconf:SparkConf=newSparkConf().setMaster("local[*]").setAppName("wc")//创建Spark上下文连接对象valsc=newSparkContext(conf)//读取文件,一行数据为一个元素vallines:RDD[String]=sc.textFile("./src/main/Scala/data.txt")//空格分隔,使其扁平化valwords:RDD[String]=lines.flatMap(_.split(""))//格式化数据,方便统计valwordToOne:RDD[(String,Int)]=words.map((_,1))//聚合valwordToSum:RDD[(String,Int)]=wordToOne.reduceByKey(_+_)//收集并打印结果println(wordToSum.collect().mkString(","))//保存结果wordToSum.saveAsTextFile("./src/main/Scala/out")}

未分类

什么是SparkSpark是一种基于内存的快速、通用、可扩展的大数据分析引擎发展历史2009年诞生,采用Scala编写2010年开源2013年6月成为Apache孵化项目2014年2月成为Apache顶级项目Spark特点快比MapReduce相比要快百倍以上,实现了DAG执行引擎,通过内存高效处理数据流易用Spark支持Java、Python、Scala、R等API,还支持超过80种高级算法,使用户可以快速构建不同的应用。还支持Python和Scala的交互式Shell。通用Spark提供了一套统一的解决方案。Spark可以用于批处理、交互式查询(SparkSQL)、实时处理(SpakrStreaming)、机器学习(SparkMLlib)、图计算(GraphX),这些不同类型的处理都可以无缝使用。兼容性Spark可以十分方便的与其他开源产品进行融合,比如:可以使用HAdoop的Yarn和Apache的Mesos作为资源管理和调度器,并且可处理所有支持Hadoop的数据,包括HDFS、HBase,这样可以使得已经使用过Hadoop集群的用户非常友好,不需要做任何的数据迁移就可以使用Spark强大的处理能力。Spark的内置模块SparkCore:实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。SparkCore中还包含了对弹性分布式数据集(ResilientDistributedDataSet,简称RDD)的API定义。SparkSQL:是Spark用来操作结构化数据的程序包。通过SparkSQL,我们可以使用SQL或者ApacheHive版本的SQL方言(HQL)来查询数据。SparkSQL支持多种数据源,比如Hive表、Parquet以及JSON等。SparkStreaming:是Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与SparkCore中的RDDAPI高度对应。SparkMLlib:提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。集群管理器:Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器(ClusterManager)上运行,包括HadoopYARN、ApacheMesos,以及Spark自带的一个简易调度器,叫作独立调度器。

2020-1-26 835 0