IDEA 编写 Spark Streaming 程序

添加依赖

注意 Spark Core 的版本和 Scala 的版本

这里有个坑,下面依赖是 Maven 仓库官方的,如果运行报错,就把<scope>provided</scope>这个删掉

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
</dependency>

大概使用流程

Spark Streaming 并不是真真意义上的实时处理,它有一个时间周期的概念,隔多长时间处理一次数据。这个间隔越小,就越接近实时。

创建 StreamingContext 对象

这里的时间周期,有几个官方提供的简单的方式,Milliseconds(毫秒)、Seconds(秒)、Minutes(分钟)…

// Spark 运行配置
val conf = new SparkConf().setMaster("local[*]").setAppName("streaming")

// 创建 SparkStreaming 上下文对象 和  采集周期
val context = new StreamingContext(conf, Seconds(3))

设置监控的主机和端口

Spark Streaming 还能监控路径里文件的变化,但是一般不使用,有时候还会不生效,因为这方面 Flume 比它更强。

设置了监控主机和端口后,那么就能得到 DStream 数据集了,底层也是 RDD,完全能使用 RDD 的方法,并且还有新的方法。

val lineDStream: ReceiverInputDStream[String] = context.socketTextStream("192.168.176.61", 8888)

保持 Streaming 的运行

// 启动 Streaming,开始 接收数据 和 处理流程
context.start()
// 等待线程终止,保持 SparkStreaming 的持续运行,等待处理结果
context.awaitTermination()

一个小例子

单词实时统计

这个例子是无状态化转换的,每个周期的数据是独立的,没有连续统计。

整体代码

// Spark 运行配置
val conf = new SparkConf().setMaster("local[*]").setAppName("streaming")

// 创建 SparkStreaming 上下文对象,跟 采集周期
val context = new StreamingContext(conf, Seconds(3))

// 设置监控的主机及端口,返回 DStream 数据集
val lineDStream: ReceiverInputDStream[String] = context.socketTextStream("192.168.176.61", 8888)

// 对 DStream 进行处理
val res: DStream[(String, Int)] = lineDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

// 打印结果
res.print()

// 启动 Streaming,开始 接收数据 和 处理流程
context.start()
// 等待线程终止,保持 SparkStreaming 的持续运行,等待处理结果
context.awaitTermination()

使用 natcat 向指定端口发送数据

  • Windows:只需要下载、解压、配置环境变量,开启命令nc -l -p 端口号
  • Linux:安装命令yum install nc,开启命令nc -lk 端口号

mark

发表评论 / Comment

用心评论~