Percolator

Percolator

本文是谷歌的经典论文,介绍了一个对大型数据集做增量处理更新的系统Percolator,谷歌用它来构建索引系统,极大地提高了处理速度。Percolator基于BigTable构建的,由于BigTable不支持跨行事务,更像是给BigTable打补丁。

Introduction

索引系统是Google Web搜索的核心系统,在应对海量索引数据时,索引创建和索引的实时更新必须要面对的挑战。Google使用Mapreduce解决了高效创建索引的问题,但MR对于实时更新的场景是不合适的,因此他们构建了一个新的增量更新系统Percolator。Percolator主要关注的是跨行事务和Notification,支持在PB级别存储库中进行随机访问,并提供强一致性的保证。

Design

Percolator为了大规模的增量更新提供了两个抽象:

  • 基于随机访问库的ACID事务;
  • observers,一种处理增量计算的方式;

每个Percolator系统包含三个二进制文件:Percolator的worker、一个BigTable的tablet服务器和一个GFS chunkserver。所有的observer都会连接到Percolator的worker中,该worker会扫描所有在BigTable中发生改变的列,然后调用observer中的回调逻辑。另外Percolator还会依赖两个服务:timestamp oracle和一个轻量级锁服务,前者通过递增的时间戳提供了快照隔离协议,后者则是依赖锁服务来搜索“dirty notification”。

BigTable

Percolator是在BigTable基础上构建,数据被组织到BigTable的行列中,元数据则存储在旁边的特殊列中,基于BigTable的接口封装了大量的API,主要目的是提供BigTable缺失的功能:多行事务和observer框架。

至于BigTable和SSTable所在的GFS的具体实现可以查看对应的论文。

Transactions

Percolator使用ACIS快照隔离来基于BigTable的跨行跨表事务。Percolator使用Bigtable中的timestamp,对每个数据项都存储多版本,以实现快照隔离。在一个事务中,按照某个timestamp读出来的版本数据就是一个快照,然后再用一个往后的timestamp写入新数据。快照隔离可以有效解决write- write冲突,如果事务A和B并行运行,同时往某个cell执行写操作,大部分情况下都能正常提交。任意的timestamp都代表了一个一致快照,读取一个cell仅仅需要用给出的timestamp执行BigTable查询即可。

考虑到Percolator不能直接控制对存储介质的访问,而是需要修改BigTable的状态,所以Percolator需要明确地维护锁,以实现分布式事务。这个锁服务需要具备几个特点:高可用,能够解决锁在2PC阶段消失的情况;高吞吐,上千台机器同时请求锁;低延时,读请求需要读取锁。BigTable作为存储介质,恰好满足这些需求,所以Percolator将数据和锁存储在同一行,特殊的内存列存取锁。访问某一行数据时,Percolator将在一个BigTable行事务中同时对同行的锁进行Read and Modify。

下图是Percolator在执行事务期间,数据和元数据的布局情况。以银行转账为例,Bob向Joe转7元,该事务从start_ts=7开始,commit_ts=8结束。

下图则展示了Percolator在BigTable中的列所展现的作用,其在BigTable中使用了5个列,其中3个与事务相关:

  • c:lock:事务产生的锁,未commit的事务会写该列,映射对是{key,start_ts}=>{primary_key};
  • c:write: 已commit的数据信息,映射对是{key,commit_ts}=>{start_ts};
  • c:data: 具体存储的数据,映射对是{key,start_ts} => {value};

事务的处理流程则是经典的两阶段提交,首先是Prewrite

  • client首先从Oracle获取全局唯一的时间戳start_ts;
  • client然后从所有key中选出一个primary,其余作为secondaries,并将所有数据写入请求并行发往存储节点;
    • 存储节点首先会进行write-write冲突检查,从c:write获取当前key的最新数据,如果该列中的commit_ts>=start_ts,则返回写冲突错误;
    • 然后会检查key是否被锁,如果锁了则返回错误;
    • 向c:lock写入{key, start_ts} => {primary_key};
    • 向c:data写入{key,start_ts} => {value};
1
2
3
4
5
6
7
8
9
10
11
12
// prewrite tries to lock cell w, returning false in case of conflict.
bool Prewrite(Write w, Write primary) {
Column c = w.col;
bigtable::Txn T = bigtable::StartRowTransaction(w.row);

if (T.Read(w.row, c+"write", [start_ts_, max])) return false;
if (T.Read(w.row, c+"lock", [0, max])) return false;

T.Write(w.row, c+"data", start_ts_, w.value);
T.Write(w.row, c+"lock", start_ts_, {primary.row, primary.col});
return T.Commit();
}

Prewrite成功后,则进入第二阶段commit

  • 从Oracle获取全局唯一的时间戳commit_ts;
  • 向primary key所在节点发起commit请求;
  • primary commit成功后则标记为事务成功了,紧接着就是向secondaries发起commit请求(事实上这里primary commit成功后,即可响应client,后续异步往secondaries发起commit即可);
  • 存储节点的处理:
    • 首先是检查key的lock是否合法;
    • 往c:write写入{key,commit_ts}=>{start_ts};
    • 清除c:lock中内容,释放锁;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
bool Commit() {
Write primary = write_[0];
vector<Write> secondaries(write_.begin() + 1, write_.end());
if (!Prewrite(primary, primary)) return false;
for (Write w : secondaries)
if (!Prewrite(w, primary)) return false;

int commit_ts = oracle.GetTimestamp();

// Commit primary first.
Write p = primary;
bigtable::Txn T = bigtable::StartRowTransaction(p.row);
if (!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_]))
return false; // aborted while working
T.Write(p.row, p.col+"write", commit_ts, start_ts_); // Pointer to data written at start_ts_
T.Erase(p.row, p.col+"lock", commit_ts); // 应该是start_ts_
if(!T.Commit()) return false; // commit point

// Second phase: write our write records for secondary cells.
for (Write w:secondaries) {
bigtable::write(w.row, w.col+"write", commit_ts, start_ts_);
bigtable::Erase(w.row, w.col+"lock", commit_ts);
}
return true;
}

Percolator的读取操作则相对简单,由于c:write记录了key的commit记录,client读取key的时候会先从c:write找到start_ts_,然后到c:data查找相对应的数据,具体流程:

  • 检查[0, start_ts_]内是否存在锁,若存在,则意味着有未commit的事务,client则必须进行等待和cleanup操作;
  • 否则,获取最新的commit记录,从c:write中获取start_ts;
  • 根据{key, start_ts}从c:data中获取数据;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool Get(Row row, Column c, string* value) {
while(true) {
bigtable::Txn = bigtable::StartRowTransaction(row);
// Check for locks that signal concurrent writes.
if (T.Read(row, c+"locks", [0, start_ts_])) {
// There is a pending lock; try to clean it and wait
BackoffAndMaybeCleanupLock(row, c);
continue;
}
}

// Find the latest write below our start_timestamp.
latest_write = T.Read(row, c+"write", [0, start_ts_]);
if(!latest_write.found()) return false; // no data
int data_ts = latest_write.start_timestamp();
*value = T.Read(row, c+"data", [data_ts, data_ts]);
return true;
}

至于事务处理过程中如何应对异常:若commit一个事务时出现了异常,导致前面Prepare阶段的锁留下来,为避免阻塞住后来的事务,Percolator采取lazy的方式清理这些锁,即访问到了这个key才会去处理。

Prewrite阶段遇到锁冲突会直接返回失败,因此锁的清理是在读阶段进行的。当事务执行过程中commit失败时,事务会留下一个commit point(Primary Key写入c:write了),但可能留下一些锁没有清理。另一个事务发现锁冲突时,会去Primary上查找primary lock是否存在。如果存在,说明前面的事务没有提交,进行roll back;如果不存在,则需要检查c:write是否已经被写入,写入了就说明事务已经被成功提交,此时执行Roll Forward(在secondaries上将c:lock替换成c:write)。BigTable的行级事务避免了数据竞争。

Timestamps

时间戳oracle是一个分配严格递增时间戳的服务器,考虑到每个事务需要调用oracle两次,因此oracle需要具备很好的可扩展性。oracle会定期分配一定范围的时间戳,并把该范围的最大值持久化存储,这样如果服务器挂了就直接从上次范围的最大值作为开始值进行分配。为了减少RPC消耗,Percolator的worker会维持一个长连接RPC到oracle,低频批量地获取时间戳。

事务协议使用严格递增的时间戳,保证了Get操作能看到所有在start_ts之前已提交的写操作。

Notifications

Percolator提供了一种方法来触发和运行事务,用户编写的代码即observer会表的变化而触发,observer会被放入Percolator worker中,随着每一个tablet服务器运行。每个observer都会向Percolator注册一个function和它感兴趣的列,一旦这些列发生了变化就会调用function。

与数据库中的触发器不一样,假设写操作触发了observer,但他们会运行在各自的事务中,产生的结果不是原子的。Percolator提供了一种保证:对于一个被观察列的变化,至多一个observer的事务被提交。反之则不然,对于一个被观察列的多次变化,可能只会触发一次observer事务。

为了实现通知机制,Percolator为每个被监测的列额外提供一个“acknowledgment”列。包含最近一次observer事务的开始时间戳。当被监测的列发生改变时,Percolator启动一个事务来处理通知,该事务读取被监测列和它对应的acknowledgment列,判断acknowledgment列的时间戳是否在被检测列之前,若是则意味着可以开启observer事务,否则意味着已经有observer被运行了。

为了实现通知机制,Percolator需要高效找到被观察的脏cell,其在BigTable的locality group维护了一个特殊的“notify”列,表示该cell是否为脏,当一个事务对被监测列进行写入时,同时会写对应的notify cell。每个Percolator的worker指定几个线程负责扫描这些脏cell。

Percolator的通知机制主要是异步实现的,当改变发生时,并不是立刻以同步方式调用observer,而是写入一个notify列,等worker线程扫描到才会调用observer。

总结

Percolator的一大特点就是构建在仅支持单行事务的BigTable之上,提供了良好的跨行事务,实现了比较简洁的分布式事务。但其性能本身不够高效,每个work都需要发送大量的RPC(比如获取两次事务timestamp,比如可能读取secondary的lock列是指向primary的,还要多读取一次),虽然论文提到了一些合并RPC,延迟发送,提高并行和增大BatchSize等措施来优化RPC的调用,但Percolator对于写协议本身也要需要多次在BigTable做持久化,读的话则可能遇到由于先写primary再同步到其他参与者导致的锁被持有而等待的问题。