Mesa: Geo-Replicated, Near Real-Time, Scalable Data Warehousing
Mesa是一个可扩展的分析型数据仓库,可用于存储Google广告业务相关的数据,能满足多数据中心、高可用、近实时等特性。
INTRODUCTION
Mesa满足了以下的要求:
- 原子更新;
- 一致性和正确性;
- 高可用性,能承受整个数据中心或者园区挂掉;
- 近实时更新的吞吐,支持全量和增量的更新;
- 查询性能,以低延时满足高吞吐;
- 可扩展性,数据读写性能都能随着集群规模实现线性增长;
- 支持热更新表的schema;
Mesa充分利用了Google内部的基础设施,包括Colossus、BigTable和BigTable等。Mesa主要是分shard存储,批量更新,每个更新分配一个版本号来满足MVCC,同时基于Paxos算法做元数据的一致性管理。
MESA STORAGE SUBSYSTEM
Mesa存储的数据是多维度的,其中维度属性称为keys,指标属性则是values。
The Data Model
Mesa的数据是以表的概念来维护的,每个表都会有一个schema。通常来说,表会有key空间和对应的value空间,分别就是前面提到的维度和对应的指标。同时values列需要定一个聚合函数,例如SUM、MIN、MAX等。聚合函数必须满足结合律,可选择性满足交换律。Mesa往往会存储上千张表,每个表有几百个列。如下图就是三张经典的Mesa表,其中表C是表B的物化视图:
Updates and Queries
为了实现高吞吐,Mesa是通过批量的方式进行更新,上游数据源以分钟级的频率批量更新。每一个更新都有对应的一个递增版本号,更新以串行的方式进行。因此对于Mesa的查询需要提供版本号和Predicate,这样就可以在[0, n]的版本key集合里做filter。
如下图所示,表A和B经历了两个更小的Batch,其中表C是B的物化视图,B的更新,对应的物化视图都能保持和原表的一致性原子更新。
另外对于一些数据会滚的需求,Mesa支持negative facts,则对一些指标列做减法,以最终一致的实现来实现会滚。
Versioned Data Management
数据版本在Mesa的读写过程中扮演着非常重要的角色,但也存在一些问题,一是存储成本会变高,二是无论查询还是更新,聚合所有版本的代价也会比较高。
Mesa的做法就是提出Delta的概念,每一个Delta包含的是不重复key的数据,并使用[V1, V2]表示版本号,V1小于或等于V2,其中的数据就是在版本号V1和V2之间更新的key,value则是这些更新操作聚合后的。另外由于每个delta内部数据都是有序的,因此合并可以以线性的时间完成。
Mesa对于delta的结构分成了三层:每次批量写入都会当作是一个单例delta合并到Mesa,单例delta的V1等于V2。因为Mesa对于指标列都会有相关的聚合函数,因此delta[V1, V2]和delta[V2, V3]可以通过合并key、聚合value的方式合并成delta[V1, V3],这些就是cumulative delta。另外还存在一个base delta,设它的版本号为[0, B], 其中B大于或等于0,生成base delta,后续任意一个[V1, V2]只要满足0 <= V1 <= V2 <= B就可以被删除(唯一的删除条件),这就是异步的base compaction。这也是为什么Mesa仅仅支持一段时间以内的所有版本,比如24小时内的,因为更早的版本已经可能被聚合到base delta里。
如下图,对于版本n的查询,就可以通过聚合这三层的delta来返回值。任何时刻都存在一个基本delta[0, B], 一系列的累积delta[B+1, B+10], [B+1, B+20],[B+1, B+30],...以及B以后的所有单例delta。这样的好处就是,对于某个版本n的查询,可以方便通过查询cumulative delta来减少IO开销。举个例子,如果查询版本91的数据,在没有cumulative delta的情况下,就需要查询61-91这32次的delta;如果存在cumulative delta,则只需要一次base的查询,61-90的cumulative,外加91这一个单例delta。
Mesa的解决思路总结起来就是:及时删除过期数据,merge小文件。
Physical Data and Index Formats
Mesa的delta,无论哪些类型其存储格式都是一样的,并且都是immutable。因此Mesa关于物理数据的存储主要关注空间成本以及查询性能。关于存储,论文没介绍太多细节,主要是分成index files和data files。Mesa将delta的行按顺序存储在大小受限的data files中,若干行的数据会组织成一个row blocks,每个row blocks则是按照column进行存储(提高压缩率,并且因为查询性能的问题优先考虑解压效率高的)。Index files存储的则是row blocks第一个key的固定长度前缀以及对应row blocks在data files中的偏移量,然后就可以将index files加载进内存通过二分查找去读数据。
MESA SYSTEM ARCHITECTURE
Single Datacenter Instance
每一个Mesa实例都包含了两个子系统:update/maintenance系统和querying系统,这些子系统可以独立扩展。元数据信息存在BigTable里,数据文件则是存在Google的Colossus。
Update/Maintenance Subsystem
update and maintenance子系统主要负责以下的操作:加载更新数据、执行表压缩,在线进行Schama修改,检查表的checksum等。这些操作都是由下图的controller/worker framework完成的。
controller可以看作是表元数据的cache,同时负责worker的调度,worker队列的管理。controller不做任何数据相关的工作,只负责调度和元数据管理。元数据存在BigTable上,controller会去订阅表的更新,同时也是元数据唯一的修改方。
worker组件则是负责在每个Mesa实例中的具体数据操作工作,不同的worker是隔离的,有自己独立的职责,有一组独立的worker池。空闲的worker会定期轮询controller,请求对应类型的任务,收到工作任务后,会去验证并处理,最后则在任务完成后通知controller。图中还有一个Garbage Collector,主要是负责清理因为worker执行失败而留下的中间状态数据。worker与controller之间通过租约的方式,防止挂掉的worker一直霸占着任务,同时controller也只接受分配的worker的任务结果,确保执行安全。
Query Subsystem
Mesa的query subsystem由下图的查询服务器组成,这些服务器接收用户查询、从元数据和数据集中查找对应内容、执行相关聚合、并在返回client前将数据转换到client协议格式。
Mesa的客户端对不同的请求有不同要求,有些要求低延时、有些要求高吞吐。因此Mesa会通过标记工作负载和隔离、优先级等机制来满足不同的延迟和吞吐量要求。
出于性能的考虑,相似的数据查询往往会路由到某个查询服务器的子集(比如同一张表的查询都由某一批查询服务器负责),这样做的好处就是,查询服务器可以通过预取和缓存的方式来提供低延时保证。在启动时,每个查询服务器都会向Global Locator Service注册所主动缓存的表列表,client可以通过这个列表来决定如何路由。
Multi-Datacenter Deployment
Mesa可以多中心部署,每个数据中心是相互隔离的的,有一份独立的数据。
Consistent Update Mechanism
Mesa中的表都是多版本的,上游系统每几分钟生成一批更新数据以供Mesa合并,如下图,Mesa入了committer组件引入了committer组件。committer为每个更新批次分配一个新版本号,并将与更新相关的所有元数据发布到版本数据库(a globally replicated and consistent data store build on top of the Paxos consensus algorithm),应该就是spanner或者F1。committer是无状态的,可以多中心部署。
Mesa的controller会监听版本数据库,以检测新更新,然后将相应的工作分配给Update workers,并将更新结果报告回版本数据库。然后committer会检查verion提交的一致性条件是否满足(例如Mesa表的物化视图是否已经更新完成)。当满足提交标准时,committer将在版本数据库里更新版本号。
Mesa的更新机制对性能非常友好:MVCC使得Mesa不需要在查询和更新之间无锁;所有更新数据都由各Mesa实例异步合并,元数据则基于Paxos协议同步更新。
New Mesa Instances
Mesa会使用P2P的方法,通过一个load worker去加载新的Mesa实例,可以将表从另一个Mesa实例复制到当前的表。另外这一机制也支持从损坏的表中恢复。
ENHANCEMENTS
Query Server Performance Optimizations
论文中心还提到了Mesa关于查询性能的优化:
- delta pruning:查询服务器会检查描述每个delta包含的键范围的元数据,避免读取不必要的delta;
- scan-to-seek:对于非第一个key有filter的查询,可以用scan-to-seek来优化索引,避免读取不必要的数据;
- resume key:Mesa 通常以流方式将数据返回给客户端,每一次返回一个block,对于每一个block,Mesa都会附加一个resume key。如果查询超时,受影响的Mesa客户端可以透明地切换到另一个查询服务器,从resume key的地方继续查询,而不是重新执行整个查询;
Parallelizing Worker Operation
Mesa利用MapReduce框架来并行化处理worker任务。
Schema Changes in Mesa
Mesa用户经常需要修改schemas,一些常见的更改比如添加或者删除列、添加或删除索引等。Mesa使用两种技术来执行在线的schemas变更:
- 拷贝固定版本的表数据并按照新的schema进行存储;
- 回放并更新当前版本和之前固定版本的数据;
- 更新元数据的schema;
- 直到旧schema没有查询,则删除旧的数据;
但这种方法成本很高,需要临时存有两份存储资源,也需要删掉历史数据。另一种技术则是linked schema change,不再是重新灌一遍历史数据,而是对增量数据以新的schema处理。如果是新加的列,对于历史数据则以数据类型的默认值填充。但这种方法无法处理所有情况:比如删除修改列等。
Mitigating Data Corruption Problems
这一章主要是讲Mesa在容错方面的努力,通过在线写入的检查和定期离线的全量检查来确认数据不会有损坏。当出现数据损坏时,Mesa实例会自动从另一个实例重新加载该表的正常副本。如果都损坏了,则从备份中恢复旧版本的表并重放更新。
CONCLUSIONS
本文介绍了Google近实时、可扩展的数据仓库的设计与实现——Mesa系统。Mesa支持在线查询和批量更新,同时提供强大的一致性和事务正确性保证,并且在数据模型上有非常创新的理论设计。