概述
Streaming 的状态更新分为两种
- 无状态更新:每个时间间隔的数据相对独立,
不会进行累计
,就像之前的单词统计一样,不使用个方法就行。 - 有状态更新:会把
现在统计的结果
与之前的结果
进行合并累计
,使用有状态更新必须设置检查点
。
S: ClassTag => Option[S]):DStream[(K, S)]" class="reference-link">updateStateByKeyS: ClassTag => Option[S]):DStream[(K, S)]
这个方法有很多种参数,注重这个 updateFunc 函数参数
。
参数
updateFunc: (Seq[V], Option[S]) => Option[S]
- Seq[V]:一个集合,集合里是
当前计算结果
相同 Key 的 Value。 - Option[S]:该对象能获取
之前时间段
的 相同 Key 的 Value。
实时单词累计统计
// Spark 通用配置 val conf = new SparkConf().setMaster("local[*]").setAppName("up") // Streaming 配置 val context = new StreamingContext(conf, Seconds(1)) // 需要使用状态更新 必须 设置检查点机制 context.checkpoint("checkdir") // 监控端口 val dataDStream: ReceiverInputDStream[String] = context.socketTextStream("192.168.176.61", 5656) // 处理数据,形成 K-V 对 val mapDstream: DStream[(String, Int)] = dataDStream.flatMap(_.split(" ")).map((_, 1)) // 新老数据合并更新 val stateDStream: DStream[(String, Int)] = mapDstream.updateStateByKey{ case (seq, option) =>{ // 获取之前的 Key,如果没有取到,则返回 0 ,说明是新出现的 Key val state: Int = option.getOrElse(0) // 新旧Key的合并 val key: Int = state + seq.sum // 返回 Option 对象 Option(key) } } // 打印结果 stateDStream.print() // 启动,持久化运行 context.start() context.awaitTermination()
版权声明:《 Spark Streaming 有状态更新 》为明妃原创文章,转载请注明出处!
最后编辑:2020-2-26 11:02:20