使用大数据技术 Spark 计算留存率(Scala版的哦)

留存率说明

在互联网行业当中,因为拉新或推广的活动把用户引过来,用户开始访问网站/应用,但是经过一段时间可能就会有一部分客户逐渐流失了。留存率定义为用户在某段时间内开始使用网站/应用(一般定义是注册),经过一段时间后,仍然继续使用的人被认作是留存用户。留存率体现了网站/应用的质量和保留用户的能力。

留存率计算方法:留存率 = 登录用户数 / 新增用户数 * 100%

一般统计周期为天,常见的周期维度有次日、7日、14日/15日、30日、60日、90日)

比如:

  • 次日留存率:(第一天新增用户数,第2天还登录的用户数)/第一天总注册用户数
  • 7日留存率:(第一天新增用户数,第8天还登录的用户数)/第一天总注册用户数
  • 30日留存率:(第一天新增用户数,第31天还登录的用户数)/第一天总注册用户

Spark 计算难点

留存率涉及到两组数据的计算(新增人数 & 某天留存人数)

计算思路

mark

思路分解

数据例子:日期 - 用户ID

2020-01-01, 1773101
2020-01-01, 1773102
2020-01-02, 1773101
2020-01-02, 1773103
2020-01-03, 1773101
2020-01-03, 1773102

自定义累加器

官方的提供的累加器里没有 mutable.Map() 的累加器

优点:能够通过 Key 方便的取到 Value就是通过 日期,方便的取到 用户ID。

// 自定义 Map 累加器
class mapAccumulator extends AccumulatorV2[Array[(String ,Iterable[String])],mutable.Map[String ,Iterable[String]]] {
    // 累加器存数据Map
    var dateToUser: mutable.Map[String, Iterable[String]] =  mutable.Map[String ,Iterable[String]]()
    // 判断 累加器是否是初始化状态
    override def isZero: Boolean = dateToUser.isEmpty
    // 复制累加器对象
    override def copy(): AccumulatorV2[Array[(String ,Iterable[String])],mutable.Map[String ,Iterable[String]]] = {
        new mapAccumulator()
    }
    // 重置累加器
    override def reset(): Unit = {
        dateToUser.clear()
    }
    // 向累加器放数据
    override def add(a:Array[(String ,Iterable[String])]): Unit = {
        val date: String = a(0)._1
        val users: Iterable[String] = a(0)._2
        dateToUser(date) = users
    }
    // 把其他分区的累加器的数据合并
    override def merge(other: AccumulatorV2[Array[(String ,Iterable[String])],mutable.Map[String ,Iterable[String]]]): Unit = {
         other.value.foreach(line => dateToUser(line._1) = line._2)
    }
    // 返回累加器的结果
    override def value: mutable.Map[String ,Iterable[String]] = dateToUser
}

注册累加器

// 注册累加器
val user_id_lists = new mapAccumulator()
sc.register(user_id_lists)

读取数据 & 分组

val csv_rdd = sc.textFile("文件")
// 相同日期分组
val group_rdd = csv_rdd.groupByKey()

存入累加器

// 按照分区遍历 提高性能
group_rdd.foreachPartition {partition =>
            partition.foreach{
                // 日期为 Key 用户ID为 Value ,形成 Map() 添加到 累加器
                line => user_id_lists.add(Array(line._1 -> line._2.toSet))
            }
        }

广播变量

取出累加器里的数据 进行广播变量

// 广播变量为 累加器的值
val user_id_map: Broadcast[mutable.Map[String, Iterable[String]]] = sc.broadcast(user_id_lists.value)

计算留存率

再次遍历 前面分组的RDD,通过日期取出广播变量里的用户ID

val res_rdd= group_rdd.mapPartitions {
            partition =>
                partition.map{
                    line =>
                        // 获取当前日期的 获取用户列表 ,没有获取到返回空的迭代
                        val at_user_id: Iterable[String] = user_id_map.value.getOrElse(line._1,Iterable())
                        // 获取之前日期 这个函数在文章后面
                        val yesterday = getIntervalDay(line._1, -1) 
                        // 获取之前日期的 用户列表 ,没有获取到返回空的迭代
                        val before_user_id: Iterable[String] = user_id_map.value.getOrElse(yesterday,Iterable())
                        // 获取之前日期总人数
                        val before_all_user: Double = before_user_id.size.toDouble
                        // 留存人数:两日期用户列表去重 取交集
                        val keep_user: Double = before_user_id.toSet.intersect(at_user_id.toSet).size.toDouble
                        // 留存率 留存人数 / 前日期总人数
                        var keep_rate: Double = keep_user / before_all_user

                        if (keep_rate.isNaN){
                            keep_rate = 0
                        }
                        // 返回 日期 - 留存率
                        (line._1, keep_rate)
                }
        }

计算间隔 x天的日期

//  获取指定间隔日期
def getIntervalDay(date: String,num: Int):String= {
    /*
    date :yyyy-MM-dd 格式日期字符串
    num : 间隔天数,有正负数之分。 +(前面) -(后面) -1 就是昨天,1 就是明天
    */
    var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var cal: Calendar = Calendar.getInstance()
    cal.setTime(dateFormat.parse(date))
    cal.add(Calendar.DATE, num)
    var day = dateFormat.format(cal.getTime())
    day
}
发表评论 / Comment

用心评论~