Spark Streaming 有状态更新

概述

Streaming 的状态更新分为两种

  • 无状态更新:每个时间间隔的数据相对独立,不会进行累计,就像之前的单词统计一样,不使用个方法就行。
  • 有状态更新:会把现在统计的结果之前的结果进行合并累计,使用有状态更新 必须设置检查点

S: ClassTag => Option[S]):DStream[(K, S)]" class="reference-link">updateStateByKeyS: ClassTag => Option[S]):DStream[(K, S)]

这个方法有很多种参数,注重这个 updateFunc 函数参数

参数

updateFunc: (Seq[V], Option[S]) => Option[S]

  • Seq[V]:一个集合,集合里是当前计算结果相同 Key 的 Value。
  • Option[S]:该对象能获取之前时间段的 相同 Key 的 Value。

实时单词累计统计

// Spark 通用配置
val conf = new SparkConf().setMaster("local[*]").setAppName("up")

// Streaming 配置
val context = new StreamingContext(conf, Seconds(1))

// 需要使用状态更新 必须 设置检查点机制
context.checkpoint("checkdir")

// 监控端口
val dataDStream: ReceiverInputDStream[String] = context.socketTextStream("192.168.176.61", 5656)

// 处理数据,形成 K-V 对
val mapDstream: DStream[(String, Int)] = dataDStream.flatMap(_.split(" ")).map((_, 1))

// 新老数据合并更新
val stateDStream: DStream[(String, Int)] = mapDstream.updateStateByKey{
    case (seq, option) =>{
        // 获取之前的 Key,如果没有取到,则返回 0 ,说明是新出现的 Key
        val state: Int = option.getOrElse(0)
        // 新旧Key的合并
        val key: Int = state + seq.sum
        // 返回 Option 对象
        Option(key)
    }
}

// 打印结果
stateDStream.print()

// 启动,持久化运行
context.start()
context.awaitTermination()
发表评论 / Comment

用心评论~