Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing——MIT6.824

Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing

Abstract

本文介绍了弹性的分布式数据集,这是一种分布式的内存抽象形式,它可以让程序员以一种容错的方式在大型集群中进行内存计算。

Introduction

尽管想MapReduce之类的框架已经提供了关于访问集群的计算资源的抽象,但仍然缺乏对于分布式内存的抽象,这使得它们在处理需要重用多个计算中中间结果的应用程序上不够高效。像一些机器学习算法、图算法,交互式数据挖掘都需要对数据子集做临时的查询,但那些框架的做法往往是将其写入到外部的存储系统里,这里IO、序列化之类的开销非常大。

本文提出了一种弹性的分布式数据集的新抽象。其可以将中间结果明确地保存在内存中,控制其分区进行优化放置,并使用一组丰富的运算符进行操作。

Apache Spark应运而生。

Resilient Distributed Datasets (RDDs)

RDD Abstraction

所谓的弹性的分布式数据集,这里的弹性指的是在任何时候都可以进行重算,让用户不会感知到某部分内容曾经丢失过,这是Spark的核心抽象,是一种只读的、分区的数据记录集合。RDD的产生,要么通过从确定存储中获取,要么就是通过其它的RDD进行转换获取,这里的转换包括map、filter和join。

RDD应该有足够的信息,去记录自身是如何从其它数据集派生而来的。用户可以控制RDD的持久化和分区,比如指示重用的RDD和存储策略,也可以命令RDD的元素进行分区。分区依照特定规则将具有相同属性的数据记录放在一起,每个分区相当于一个数据集片段。

Spark Programming Interface

Spark会使用集成API的方式暴露RDD,其中每个数据集标示为一个对象,并使用对象上的方法进行调用转换。

首先是会对稳定存储中的数据通过转换的方式定义一个或多个RDD,然后在操作中使用这些RDD,比如是返回数据给应用程序,还是导出到存储系统。此外,还可以对RDD进行持久化来指示哪些RDD是需要重用的,默认将持久性RDD保存在内存中,在RAM不够的话,将会将其溢出到磁盘,当然还有很多种持久化策略。

Advantages of the RDD Model

RDD与分布式共享内存最大的区别就是,RDD只能通过粗粒度的转换得来,而DSM则可以读取/写入到每一个内存位置。这样RDD在处理容错时,就不会产生额外的有关checkpoint的开销,如果有分区丢失,RDD可以在不同的节点并行地重建,而不需要回滚整个系统。

由于RDD的不可变特性,系统可以通过运行较慢的备份副本来缓解慢速节点,而DSM在这种情况会因为多副本访问相同内存位置,而产生干扰更新。

Applications Not Suitable for RDDs

RDD更适用于在批处理应用程序中对全集数据执行相同的操作,在这种情况下,RDD能够很好地记录每一步的转换,并且能够在分区丢失时快速恢复。

而对于那些需要对共享状态进行异步更新的应用,RDD则不适合。

Spark Programming Interface

为了使用Spark,开发人员编写了一个驱动程序,用以连接到一组worker,并通过一个或多个RDD来调用action,同时该驱动程序上的Spark代码还可以跟踪RDD的lineage(血统?)。

这些worker是一个长期活跃的进程,在内存中存着RDD分区。

RDD Operations in Spark

下表列出了Spark中可用的RDD转换和可用操作:

Representing RDDs

论文中提到使用了大约14000行Scala代码实现了Spark,这个系统在Mesos集群上运行。每个Spark程序都作为一个单独的Mesos应用程序,具有独立的驱动程序和worker程序,并且这些应用程序之间的资源共享由Mesos处理。

论文中提到RDD的表示是一种基于图的表示。因此对于RDD的表示,则是通过暴露5个接口方法来实现的。

  • Partions:数据集的原子结构;
  • preferredLocations:能更快访问分区的系列节点;
  • dependencies:记录父子RDD的记录;
  • iterator:用于从父RDD计算子RDD;
  • patitioner:数据分区的元信息;

至于如何表示RDD的关系,由于RDD在物理上是分区的,散列在集群不同机器的内存上的,文中将其定义为窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)两种。

  • 宽依赖:父RDD中的分区可能被子RDD中的多个分区所依赖;
  • 窄依赖:父RDD的每个分区至多被子RDD中的一个分区所依赖;像map/filter这些操作都属于窄依赖;

这两种依赖的差别在于:窄依赖可以pipeline执行,在失败时只需要重新执行对应的父RDD即可;而宽依赖则需要shuffle,并且如果出现故障恢复则需要重算所有父RDD;

Job Scheduling

每当用户在RDD执行action的时候,调度器就会检查RDD的谱系图,以构建要执行的DAG(有向无环图)。如下图所示:

每个stage内部都包含尽可能多的具有窄依赖的操作。这些stage的边界是宽依赖所必需执行的shuffle操作,另外任何已经计算出的分区都可以使父RDD的计算短路。调度程序会集群上启动任务以计算每个阶段中缺少的分区,直到它计算出目标RDD。

调度器会根据数据局部性的原则来执行delay scheduling算法:

  • 如果任务需要的数据分区在某节点的内存中,则将任务发送到节点上执行;
  • 否则,如果该分区有指定的位置,则直接发送给它;

对于宽依赖,Spark会在存有父分区的节点上暂存shuffle的中间记录,以便做容灾处理,就像mapreduce存下map的输出一样。

如果任务失败,只要stage的父stage还是可用的,就可以将task调度到另一个节点上重新运行即可。如果父stage也失效了,就会重新提交一个计算父stage数据的Task来并行计算丢失的分区。但论文也提到了Spark没有考虑调度器本身的高可用。

Interpreter Integration

Scala包含了交互shell,可以让用户从解释器中交互地运行Spark以查询大数据集。

Scala解释器通常会为用户输入的每一行编译一个类,并将其加载到JVM中通过调用一个函数来进行操作。该类会包含一个单例对象,对象则包含该行上的变量或者函数,并以初始化的方式运行该行代码。

另外,Spark的解释器还做了两处修改:

  1. 类传递:为了让工作节点能够获取在每一行上创建的类的字节码,解释器会通过HTTP为这些类提供服务;
  2. 修改代码的生成:因为每行代码的单例对象都是通过对应的静态方法访问,这意味着无法引用上一行定义的变量。因此需要修改代码的生成,以便直接引用每行对象的实例,如下图:

Memory Management

Spark提供了三种对持久化RDD的存储策略:

  • 未序列化Java对象存在内存;
  • 序列化的数据存于内存;
  • 磁盘存储;

第一种性能最好,因为可以直接访问在Java虚拟机内存里的RDD对象;第二种性能会降低,在空间有限的情况下可以让用户选择比Java对象图更高效的内存表示方式;第三种则是针对RDD太大无法保留在内存中,但每次使用都需要重新计算开销很大时,这个方法会很有用;

为了管理可用的有限内存,Spark在RDD级别使用了LRU逐出策略。当计算了一个新的RDD分区但没有足够存储空间时,就会通过LRU的方式逐出一个分区。除非是该RDD便是新分区对应的RDD,在这种情况下,Spark会将旧的分区保留在内存,避免同一个RDD的分区被循环地调进调出。

Support for Checkpointing

虽然lineage机制可以满足失败后RDD的重建恢复,但对于具有很长链条的RDD来说,恢复时间会很长。特别是包含了宽依赖的长lineage的RDD,因此能设置检查点的操作就会非常有用。Spark当前提供了为RDD设置检查点操作的API,可以用一个REPLICATE标志来持久化,用户自行决定使用方式。