Spark Partitioner (分区器)之 自定义分区器

自定义分区器

在使用 partitionBy() 函时我们需要传一个分区器。在有些场景下,我们需要指定某些数据放在一个分区,这时自带的分区器不能满足我们的需求。

源码分析

org.apache.spark.HashPartitioner 分区器

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

我们看见这个类继承了 Partitioner() ,继续查看源码

abstract class Partitioner extends Serializable {
  // 创建出来的分区数。
  def numPartitions: Int
  // 返回给定键的分区编号
  def getPartition(key: Any): Int
}

发现这是一个抽象类,里面只有两个方法,我们就可以对比着 HashPartitioner类来看

  • numPartitions(): Int:分区数,这个方法没有做处理,就是传进来的值
  • getPartition(key: Any): Int:传入的参数是 Key ,所以就判断这个 Key返回一个 Int 来确定数据放在哪个分区。

重写 Partitioner 类

把下面这个 RDD,把包含 a的放在 1 分区,b放在 2 分区,其他的放在 0 分区

val listRDD: RDD[(String, String)] = sc.makeRDD(List(("aa", "a"), ("ab", "b"), ("bc", "c"), ("bd", "d"),("dd","d")))

自定义分区

class MyPart(partitions: Int) extends Partitioner {

    // 包含 a 的 1 分区
    // 包含 b 的 2 分区
    // 其他的 0 分区
    // 传入的 参数 至少大于三
    override def numPartitions: Int = partitions

    override def getPartition(key: Any): Int = {
        // 这里注意改变类型,然后才能使用响应的方法
        if (key.toString.indexOf("a") != -1) {
            1
        } else if (key.toString.indexOf("b") != -1) {
            2
        } else {
            0
        }
    }

使用自定义的分区器

val partRDD: RDD[(String, String)] = listRDD.partitionBy(new MyPart(3))

查看分区结果

partRDD.saveAsTextFile("outpath")

mark

发表评论 / Comment

用心评论~