留存率说明
在互联网行业当中,因为拉新或推广的活动把用户引过来,用户开始访问网站/应用
,但是经过一段时间可能就会有一部分客户逐渐流失了
。留存率定义为用户在某段时间内开始使用网站/应用(一般定义是注册),经过一段时间后,仍然继续使用的人
被认作是留存用户
。留存率体现了网站/应用的质量和保留用户的能力。
留存率计算方法:留存率 = 登录用户数 / 新增用户数 * 100%
一般统计周期为天,常见的周期维度有次日、7日、14日/15日、30日、60日、90日)
比如:
- 次日留存率:(第一天新增用户数,第2天还登录的用户数)/第一天总注册用户数
- 7日留存率:(第一天新增用户数,第8天还登录的用户数)/第一天总注册用户数
- 30日留存率:(第一天新增用户数,第31天还登录的用户数)/第一天总注册用户
Spark 计算难点
留存率涉及到两组数据的计算
(新增人数 & 某天留存人数)
计算思路
思路分解
数据例子:日期 - 用户ID
2020-01-01, 1773101 2020-01-01, 1773102 2020-01-02, 1773101 2020-01-02, 1773103 2020-01-03, 1773101 2020-01-03, 1773102
自定义累加器
官方的提供的累加器里
没有 mutable.Map() 的累加器
优点:能够通过 Key
方便的取到 Value
。就是通过 日期,方便的取到 用户ID。
// 自定义 Map 累加器 class mapAccumulator extends AccumulatorV2[Array[(String ,Iterable[String])],mutable.Map[String ,Iterable[String]]] { // 累加器存数据Map var dateToUser: mutable.Map[String, Iterable[String]] = mutable.Map[String ,Iterable[String]]() // 判断 累加器是否是初始化状态 override def isZero: Boolean = dateToUser.isEmpty // 复制累加器对象 override def copy(): AccumulatorV2[Array[(String ,Iterable[String])],mutable.Map[String ,Iterable[String]]] = { new mapAccumulator() } // 重置累加器 override def reset(): Unit = { dateToUser.clear() } // 向累加器放数据 override def add(a:Array[(String ,Iterable[String])]): Unit = { val date: String = a(0)._1 val users: Iterable[String] = a(0)._2 dateToUser(date) = users } // 把其他分区的累加器的数据合并 override def merge(other: AccumulatorV2[Array[(String ,Iterable[String])],mutable.Map[String ,Iterable[String]]]): Unit = { other.value.foreach(line => dateToUser(line._1) = line._2) } // 返回累加器的结果 override def value: mutable.Map[String ,Iterable[String]] = dateToUser }
注册累加器
// 注册累加器 val user_id_lists = new mapAccumulator() sc.register(user_id_lists)
读取数据 & 分组
val csv_rdd = sc.textFile("文件") // 相同日期分组 val group_rdd = csv_rdd.groupByKey()
存入累加器
// 按照分区遍历 提高性能 group_rdd.foreachPartition {partition => partition.foreach{ // 日期为 Key 用户ID为 Value ,形成 Map() 添加到 累加器 line => user_id_lists.add(Array(line._1 -> line._2.toSet)) } }
广播变量
取出累加器里的数据 进行广播变量
// 广播变量为 累加器的值 val user_id_map: Broadcast[mutable.Map[String, Iterable[String]]] = sc.broadcast(user_id_lists.value)
计算留存率
再次遍历
前面分组的RDD
,通过日期
取出广播变量
里的用户ID
val res_rdd= group_rdd.mapPartitions { partition => partition.map{ line => // 获取当前日期的 获取用户列表 ,没有获取到返回空的迭代 val at_user_id: Iterable[String] = user_id_map.value.getOrElse(line._1,Iterable()) // 获取之前日期 这个函数在文章后面 val yesterday = getIntervalDay(line._1, -1) // 获取之前日期的 用户列表 ,没有获取到返回空的迭代 val before_user_id: Iterable[String] = user_id_map.value.getOrElse(yesterday,Iterable()) // 获取之前日期总人数 val before_all_user: Double = before_user_id.size.toDouble // 留存人数:两日期用户列表去重 取交集 val keep_user: Double = before_user_id.toSet.intersect(at_user_id.toSet).size.toDouble // 留存率 留存人数 / 前日期总人数 var keep_rate: Double = keep_user / before_all_user if (keep_rate.isNaN){ keep_rate = 0 } // 返回 日期 - 留存率 (line._1, keep_rate) } }
计算间隔 x天的日期
// 获取指定间隔日期 def getIntervalDay(date: String,num: Int):String= { /* date :yyyy-MM-dd 格式日期字符串 num : 间隔天数,有正负数之分。 +(前面) -(后面) -1 就是昨天,1 就是明天 */ var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") var cal: Calendar = Calendar.getInstance() cal.setTime(dateFormat.parse(date)) cal.add(Calendar.DATE, num) var day = dateFormat.format(cal.getTime()) day }
版权声明:《 使用大数据技术 Spark 计算留存率(Scala版的哦) 》为明妃原创文章,转载请注明出处!
最后编辑:2020-4-16 13:04:28