Spark Streaming流式处理
目标
- 掌握Spark Streaming的原理架构
- 掌握DStream的相关操作
- 实现spark streaming 与 flume的整合
- 实现spark streaming 与 kafka的整合
1丶 Spark Streaming
概述
- 类似于Storm,用于流式数据的处理
- 高吞吐,容错能力强
- 可扩展,支持多种数据源. Kafka丶Flume丶Twitter..
- Spark高度抽象操作. map丶reduce丶join..
特性
- Ease of Use 易用性
- Fault Tolerance 容错
- Spark Integration 易整合到spark 体系中
2丶Spark Streaming原理*
2.1 原理
Spark Streaming
是基于spark的流式批处理引擎,其基本原理是把输入数据以某一时间间隔批量的处理,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。
2.2 计算流程
Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行缓存或者存储到外部设备。下图显示了Spark Streaming的整个流程。
2.3 容错性
对于流式计算来说,容错性至关重要。首先我们要明确一下Spark中RDD的容错机制。每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系(lineage),所以只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是可以利用原始输入数据通过转换操作而重新算出的。
对于SparkStreaming来说,其RDD的传承关系如下图所示:
图中的每一个椭圆形表示一个RDD,椭圆形中的每个圆形代表一个RDD中的一个Partition,图中的每一列的多个RDD表示一个DStream(图中有三个DStream),而每一行最后一个RDD则表示每一个Batch Size所产生的中间结果RDD。我们可以看到图中的每一个RDD都是通过lineage相连接的,由于Spark Streaming输入数据可以来自于磁盘,例如HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming会将网络输入数据的每一个数据流拷贝两份到其他的机器)都能保证容错性,所以RDD中任意的Partition出错,都可以并行地在其他机器上将缺失的Partition计算出来。这个容错恢复方式比连续计算模型(如Storm)的效率更高。
2.4 实时性
对于实时性的讨论,会牵涉到流式处理框架的应用场景。Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),所以Spark Streaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。
3丶 DStream
3.1 概述
Discretized
Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:
对数据的操作也是按照RDD为单位来进行的
Spark Streaming使用数据源产生的数据流创建DStream,也可以在已有的DStream上使用一些操作来创建新的DStream。
它的工作流程像下面的图所示一样,接受到实时数据后,给数据分批次,然后传给Spark Engine处理最后生成该批次的结果。
3.2 DStream相关操作
Transformations on DStreams
| Transformation | Meaning |
| :——————————: | :————————————–: |
| map(func) | 对DStream中的各个元素进行func函数操作,然后返回一个新的DStream |
| flatMap(func) | 与map方法类似,只不过各个输入项可以被输出为零个或多个输出项 |
| filter(func) | 过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream |
| repartition(numPartitions) | 增加或减少DStream中的分区数,从而改变DStream的并行度 |
| union(otherStream) | 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream. |
| count() | 通过对DStream中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream |
| reduce(func) | 对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream. |
| countByValue() | 对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数 |
| reduceByKey(func, [numTasks]) | 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream |
| join(otherStream, [numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream |
| cogroup(otherStream, [numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream |
| transform(func) | 通过RDD-to-RDD函数作用于DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD |
| updateStateByKey(func) | 根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream |特殊的Transformations
1) UpdateStateByKey Operation
UpdateStateByKey用于记录历史记录,保存上次的状态
2) Window Operations(开窗函数)
滑动窗口转换操作:
滑动窗口转换操作的计算过程如下图所示,我们可以事先设定一个滑动窗口的长度(也就是窗口的持续时间),并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算),然后,就可以让窗口按照指定时间间隔在源DStream上滑动,每次窗口停放的位置上,都会有一部分DStream被框入窗口内,形成一个小段的DStream,这时,就可以启动对这个小段DStream的计算。
(1)红色的矩形就是一个窗口,窗口框住的是一段时间内的数据流。
(2)这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。
所以基于窗口的操作,需要指定2个参数:
· window length - The duration of the window (3 in the figure)
· slide interval - The interval at which the window-based operation isperformed (2 in the figure).
a.窗口大小,一段时间内数据的容器。
b.滑动间隔,每隔多久计算一次。
4丶 案例分析
(1)安装并启动生产者
首先在linux服务器上用YUM安装nc工具,nc命令是netcat命令的简称,都是用来设置路由器。我们可以利用它向某个端口发送数据。
yum install -y nc
(2)通过netcat工具向指定的端口发送数据
nc -lk 9999
案例一: 利用spark streaming实现socket数据流下的单词计数
SparkStreamingSocket.scala
1 | // 1. 创建sparkConf |
案例二: 利用spark streaming接收socket数据,实现所有批次中单词计数累加
SparkStreamingSocketTotal.scala
1 | // 1. 创建sparkConf |
案例三: 利用spark streaming 开窗函数reduceByKeyAndWindow 实现单词计数
SparkStreamingSocketWindow.scala
每隔十秒,统计之前二十秒中的数据单词出现的次数
1 | // 1. 创建sparkConf |
- 窗口的长度 > 滑动窗口的间隔 –> 数据会被重复处理
- 窗口的长度 < 滑动窗口的间隔 –> 数据会丢失
当我们在正常使用窗口函数的时候
- 保证窗口长度和滑动时间的间隔相同
- 窗口长度和滑动窗口的时间间隔,设置为批次时间的整数倍
案例四: 利用spark streaming实现socket数据,实现一定时间内的热门词汇
SparkStreamingHotWords.scala
1 | // 1. 创建sparkConf |
5丶 sparkstreaming 整合flume
flume作为日志实时采集的框架,可以与SparkStreaming实时处理框进行对接,flume实时产生数据,sparkStreaming做实时处理。
Spark Streaming对接FlumeNG有两种方式,一种是FlumeNG将消息Push推给Spark Streaming,还有一种是Spark Streaming从flume 中Poll拉取数据。
5.1 Poll方式
(1)安装flume1.6以上
(2)下载依赖包
spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目录下
(3)写flume的agent,注意既然是拉取的方式,那么flume向自己所在的机器上产数据就行
(4)编写flume-poll.conf配置文件
1 | a1.sources = r1 |
启动脚本
1 | bin/flume-ng agent -n a1 -c /export/software/flume/conf -f /export/software/flume/conf/flume-poll-spark.conf -Dflume.root.logger=INFO,console |
pom依赖
1 | <dependency> |
需求: SparkStreaming与flume整合 —————- poll
SparkStreamingPollFlume.scala
1 | // 1. 创建sparkConf |
5.2 Push方式
编写flume-push.conf配置文件
1 | push mode |
启动脚本
1 | bin/flume-ng agent -n a1 -c /export/software/flume/conf -f /export/software/flume/conf/flume-push-spark.conf -Dflume.root.logger=INFO,console |
需求: SparkStreaming与flume整合 —————push
SparkStreamingPushFlume.scala
1 | // 1. 创建sparkConf |
6丶Spark Streaming 与 kafka整合
6.1 KafkaUtils.createDstream方式
需求: sparkStreaming 与 kafka整合 — Receiver 采用的是kafka的高级API(偏移量 由zk维护)
SparkStreamingKafka_Receiver.scala
1 | // 1. 创建sparkConf |
6.2 KafkaUtils.createDirectStream方式
定期从kafka的topic下对应的partition中查询最新的偏移量,根据偏移量的范围…
相比Receiver方式的优点
A丶 简化并行
不需要创建多个Kafka输入流,然后union,…spark中RDD的分区和和kafka中分区数是一一对应的
B丶 高效
…
C丶 恰好一次语义
需求: sparkStreaming 与 kafka整合 — 不是基于receiver,采用kafka低级API,偏移量不会保存到zk
SparkStreamingKafka_Direct.scala
1 |
|
//todo: 利用checkpoint目录来构建 StreamContext
SparkStreamingKafka_Direct_Checkpoint.scala
1 | // 1. checkpoint目录 |