添加依赖
注意 Spark Core 的版本和 Scala 的版本
这里有个坑,下面依赖是 Maven 仓库官方的,如果运行报错,就把<scope>provided</scope>
这个删掉
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.1</version> <scope>provided</scope> </dependency>
大概使用流程
Spark Streaming
并不是真真意义上的实时处理
,它有一个时间周期的概念,隔多长时间处理一次数据。这个间隔越小,就越接近实时。
创建 StreamingContext 对象
这里的时间周期,有几个官方提供的简单的方式,Milliseconds(毫秒)、Seconds(秒)、Minutes(分钟)…
// Spark 运行配置 val conf = new SparkConf().setMaster("local[*]").setAppName("streaming") // 创建 SparkStreaming 上下文对象 和 采集周期 val context = new StreamingContext(conf, Seconds(3))
设置监控的主机和端口
Spark Streaming 还能监控路径里文件的变化,但是一般不使用,有时候还会不生效,因为这方面 Flume 比它更强。
设置了监控主机和端口后,那么就能得到 DStream
数据集了,底层也是 RDD,完全能使用 RDD 的方法
,并且还有新的方法。
val lineDStream: ReceiverInputDStream[String] = context.socketTextStream("192.168.176.61", 8888)
保持 Streaming 的运行
// 启动 Streaming,开始 接收数据 和 处理流程 context.start() // 等待线程终止,保持 SparkStreaming 的持续运行,等待处理结果 context.awaitTermination()
一个小例子
单词实时统计
这个例子是无状态化转换
的,每个周期的数据是独立的,没有连续统计。
整体代码
// Spark 运行配置 val conf = new SparkConf().setMaster("local[*]").setAppName("streaming") // 创建 SparkStreaming 上下文对象,跟 采集周期 val context = new StreamingContext(conf, Seconds(3)) // 设置监控的主机及端口,返回 DStream 数据集 val lineDStream: ReceiverInputDStream[String] = context.socketTextStream("192.168.176.61", 8888) // 对 DStream 进行处理 val res: DStream[(String, Int)] = lineDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // 打印结果 res.print() // 启动 Streaming,开始 接收数据 和 处理流程 context.start() // 等待线程终止,保持 SparkStreaming 的持续运行,等待处理结果 context.awaitTermination()
使用 natcat 向指定端口发送数据
- Windows:只需要下载、解压、配置环境变量,开启命令
nc -l -p 端口号
- Linux:安装命令
yum install nc
,开启命令nc -lk 端口号
版权声明:《 IDEA 编写 Spark Streaming 程序 》为明妃原创文章,转载请注明出处!
最后编辑:2020-2-24 14:02:32