自定义分区器
在使用 partitionBy()
函时我们需要传一个分区器。在有些场景下,我们需要指定某些数据放在一个分区
,这时自带的分区器不能满足我们的需求。
源码分析
org.apache.spark.HashPartitioner 分区器
class HashPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") def numPartitions: Int = partitions def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions }
我们看见这个类继承了 Partitioner() ,继续查看源码
abstract class Partitioner extends Serializable { // 创建出来的分区数。 def numPartitions: Int // 返回给定键的分区编号 def getPartition(key: Any): Int }
发现这是一个抽象类,里面只有两个方法,我们就可以对比着 HashPartitioner
类来看
numPartitions(): Int
:分区数,这个方法没有做处理,就是传进来的值getPartition(key: Any): Int
:传入的参数是Key
,所以就判断这个Key
,返回一个 Int
来确定数据放在哪个分区。
重写 Partitioner 类
把下面这个 RDD,把包含
a
的放在 1 分区,b
放在 2 分区,其他的
放在 0 分区
val listRDD: RDD[(String, String)] = sc.makeRDD(List(("aa", "a"), ("ab", "b"), ("bc", "c"), ("bd", "d"),("dd","d")))
自定义分区
class MyPart(partitions: Int) extends Partitioner { // 包含 a 的 1 分区 // 包含 b 的 2 分区 // 其他的 0 分区 // 传入的 参数 至少大于三 override def numPartitions: Int = partitions override def getPartition(key: Any): Int = { // 这里注意改变类型,然后才能使用响应的方法 if (key.toString.indexOf("a") != -1) { 1 } else if (key.toString.indexOf("b") != -1) { 2 } else { 0 } }
使用自定义的分区器
val partRDD: RDD[(String, String)] = listRDD.partitionBy(new MyPart(3))
查看分区结果
partRDD.saveAsTextFile("outpath")
版权声明:《 Spark Partitioner (分区器)之 自定义分区器 》为明妃原创文章,转载请注明出处!
最后编辑:2020-2-15 14:02:50