Spark:6、Spark RDD持久化与检查点机制

RDD的持久化

1) RDD的缓存

Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。

2) RDD缓存方式

RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。
但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

scala> val rdd = sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at makeRDD at <console>:25

scala> val nocache = rdd.map(_.toString+"["+System.currentTimeMillis+"]")
nocache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at map at <console>:27

scala> val cache = rdd.map(_.toString+"["+System.currentTimeMillis+"]")
cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at map at <console>:27

scala> cache.cache
res24: cache.type = MapPartitionsRDD[21] at map at <console>:27

scala> nocache.collect
res25: Array[String] = Array(1[1505479375155], 2[1505479374674], 3[1505479374674], 4[1505479375153], 5[1505479375153], 6[1505479374675], 7[1505479375154], 8[1505479375154], 9[1505479374676], 10[1505479374676])

scala> nocache.collect
res26: Array[String] = Array(1[1505479375679], 2[1505479376157], 3[1505479376157], 4[1505479375680], 5[1505479375680], 6[1505479376159], 7[1505479375680], 8[1505479375680], 9[1505479376158], 10[1505479376158])

scala> nocache.collect
res27: Array[String] = Array(1[1505479376743], 2[1505479377218], 3[1505479377218], 4[1505479376745], 5[1505479376745], 6[1505479377219], 7[1505479376747], 8[1505479376747], 9[1505479377218], 10[1505479377218])

scala> cache.collect
res28: Array[String] = Array(1[1505479382745], 2[1505479382253], 3[1505479382253], 4[1505479382748], 5[1505479382748], 6[1505479382257], 7[1505479382747], 8[1505479382747], 9[1505479382253], 10[1505479382253])

scala> cache.collect
res29: Array[String] = Array(1[1505479382745], 2[1505479382253], 3[1505479382253], 4[1505479382748], 5[1505479382748], 6[1505479382257], 7[1505479382747], 8[1505479382747], 9[1505479382253], 10[1505479382253])

scala> cache.collect
res30: Array[String] = Array(1[1505479382745], 2[1505479382253], 3[1505479382253], 4[1505479382748], 5[1505479382748], 6[1505479382257], 7[1505479382747], 8[1505479382747], 9[1505479382253], 10[1505479382253])

cache.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)

在存储级别的末尾加上“_2”来把持久化数据存为两份。

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

注意:使用 Tachyon可以实现堆外缓存。

RDD检查点机制

Spark中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage(血统)做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。

cache 和 checkpoint 是有显著区别的, 缓存把 RDD 计算出来然后放在内存中,但是RDD 的依赖链(相当于数据库中的redo 日志),也不能丢掉,当某个点某个 executor 宕了,上面cache 的RDD就会丢掉,需要通过依赖链重放计算出来,不同的是,checkpoint是把 RDD 保存在 HDFS中, 是多副本可靠存储,所以依赖链就可以丢掉了,就斩断了依赖链, 是通过复制实现的高容错。

如果存在以下场景,则比较适合使用检查点机制:

1) DAG中的Lineage过长,如果重算,则开销太大(如在PageRank中)。

2) 在宽依赖上做Checkpoint获得的收益更大。

为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移出。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

scala> val data = sc.parallelize(1 to 100 , 5)
data: org.apache.spark.rdd.RDD[Int] =ParallelCollectionRDD[12] at parallelize at <console>:12

scala> sc.setCheckpointDir("hdfs://hadoop001:9000/checkpoint")

scala> data.checkpoint

scala> data.count

scala> val ch1 = sc.parallelize(1 to 2)
ch1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:25

scala> val ch2 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")
ch2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[36] at map at <console>:27

scala> val ch3 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")
ch3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[37] at map at <console>:27

scala> ch3.checkpoint

scala> ch2.collect
res62: Array[String] = Array(1[1505480940726], 2[1505480940243])

scala> ch2.collect
res63: Array[String] = Array(1[1505480941957], 2[1505480941480])

scala> ch2.collect
res64: Array[String] = Array(1[1505480942736], 2[1505480942257])

scala> ch3.collect
res65: Array[String] = Array(1[1505480949080], 2[1505480948603])

scala> ch3.collect
res66: Array[String] = Array(1[1505480948683], 2[1505480949161])

scala> ch3.collect
res67: Array[String] = Array(1[1505480948683], 2[1505480949161])

checkpoint写流程

RDD checkpoint 过程中会经过以下几个状态:

[ Initialized → marked for checkpointing → checkpointing in progress → checkpointed ]

转换流程如下:

1) data.checkpoint 这个函数调用中, 设置的目录中, 所有依赖的 RDD 都会被删除, 函数必须在 job 运行之前调用执行, 强烈建议 RDD 缓存在内存中(又提到一次,千万要注意,即checkpoint()之前,先cache()),否则保存到文件的时候需要从头计算。初始化RDD的 checkpointData 变量为 ReliableRDDCheckpointData。 这时候标记为 Initialized 状态

2) 在所有 job action 的时候,runJob 方法中都会调用 rdd.doCheckpoint , 这个会向前递归调用所有的依赖的RDD,看看需不需要 checkpoint。如果需要,然后调用 checkpointData.get.checkpoint(),里面标记状态为CheckpointingInProgress,里面调用具体实现类的 ReliableRDDCheckpointData的doCheckpoint 方法。

3) doCheckpoint -> writeRDDToCheckpointDirectory, 注意这里会把 job 再运行一次, 如果已经cache 了,就可以直接使用缓存中的 RDD 了, 就不需要重头计算一遍了(怎么又说了一遍),这时候直接把RDD,输出到 hdfs,每个分区一个文件,会先写到一个临时文件,如果全部输出完,进行rename,如果输出失败,就回滚delete。

4) 标记 状态为 Checkpointed, markCheckpointed方法中清除所有的依赖, 怎么清除依赖的呢, 就是把RDD 变量的强引用设置为 null,垃圾回收了,会触发 ContextCleaner 里面的监听,清除实际 BlockManager 缓存中的数据。

checkpoint读流程

如果一个RDD我们已经checkpoint了那么是什么时候用呢,checkpoint将RDD持久化到HDFS或本地文件夹,如果不被手动remove掉,是一直存在的,也就是说可以被下一个driverprogram使用。比如sparkstreaming挂掉了,重启后就可以使用之前checkpoint的数据进行recover,当然在同一个driverprogram也可以使用。我们讲下在同一个driverprogram中是怎么使用checkpoint数据的。

具体细节如下:

如果一个RDD被checkpoint了,那么这个RDD中对分区和依赖的处理都是使用的RDD内部的checkpointRDD变量,具体实现是ReliableCheckpointRDD类型。这个是在checkpoint写流程中创建的。依赖和获取分区方法中先判断是否已经checkpoint,如果已经checkpoint了,就斩断依赖,使用ReliableCheckpointRDD,来处理依赖和获取分区。
如果没有,才往前回溯依赖。依赖就是没有依赖,因为已经斩断了依赖,获取分区数据就是读取checkpoint到hdfs目录中不同分区保存下来的文件。

如果在 Scala 中出现了 NotSerializableException,通常问题就在于我们传递了一个不可序列 化的类中的函数或字段。
您的支持将鼓励我继续创作!
-------------本文结束感谢您的阅读-------------