返回 登录
0

Schemaless架构(三):Uber基于MySQL的Trip数据库

阅读73648

Schemaless架构(三):Uber基于MySQL的Trip数据库
Schemaless架构(二):Uber基于MySQL的Trip数据库
Schemaless架构(一):Uber基于MySQL的可扩展数据库

图片描述

本文介绍Schemaless的主要功能:Schemaless trigger的细节与案例。本文是系列文章的第三部分;第一部分是关于Schemaless的设计,第二部分是讨论其架构。

Schemaless trigger是一项具有可扩展性、容错性和无损性的技术,监听Schemaless实例中的变更。在行程(trip)流程中起到引擎的作用,从司机按下“结束行程”并向系统提交费用,直到相应数据进入数据库等待分析。在Schemaless系列的最后一篇中,我们将深入讲解Schemaless trigger的功能,以及如何开发出这个可扩展的容错系统。

简单来说,在Schemaless数据的基本单位被命名为单元(cell)。它是不可变的,一旦写入,便无法被覆盖。(在特殊情况下,我们可以删除旧记录);单元可以被行键(row key)、列名(column name)和引用键(ref key)来引用;单元内容通过编写引用键更高的新版来执行更新,但行键和列名保持不变。Schemaless不对其中存储的数据执行任何操作(故而命名schemaless)。从Schemaless的观点来看,它只负责存储JSON对象

Schemaless Trigger案例

我们来看一下实践中Schemaless trigger的运作方式。下面的代码是简化版的异步计费方式(大写标注Schemaless的列名)。案例Python代码


#我们实例化一个客户端,以便与Schemaless实例通讯
schemaless_client = SchemalessClient(datastore=’mezzanine’)

#为BASE列注册一个bill_rider功能
@trigger(column=’BASE’)

def bill_rider(row_key):
  # row_key是行程的UUID
  status = schemaless_client.get_cell_latest(row_key, ‘STATUS’)
  if status.is_completed:
    #也就是说我们已经提交了乘客的账单
    return

#否则就尝试提交账单
#我们从BASE列拿到了基本行程信息
trip_info = schemaless_client.get_cell_latest(row_key, ‘BASE’)  

#提交乘客账单
result = call_to_credit_card_processor_for_billing_trip(trip_info)

if result != ‘SUCCESS’:
  #提交例外,让Schemaless trigger稍后重试。
  raise CouldNotBillRider()

#成功提交乘客账单,写入Mezzanine
schemaless_client.put(row_key, status, body={‘is_completed’: True, ‘result’: result})

在Schemaless实例中,我们在函数中通过添加decorator@trigger来定义trigger,并指定列。如果指定列的单元中有内容,通知Schemaless trigger框架调用函数——本例是bill_rider。这里通过BASE中的一个新单元表明行程结束。触发trigger,然后通过函数来发送行键——本例是行程UUID。如果需要更多数据,必须从Schemaless实例——本例是从行程存储Mezzanine中获取真实数据。

bill_rider trigger函数的信息流见下表(这里是乘客结账)。箭头方向指明调用方与被调方,旁边的数字指明流程的顺序:

图片描述

首先将行程输入Mezzanine,Schemaless Trigger框架调用bill_rider。在调用时,函数向行程存储请求STATUS列的最新信息。本例中is_completed字段不存在,也就是说乘客尚未结账。然后获得BASE列的行程信息,通过函数调用信用卡provider来结账。在本例中,我们成功用信用卡付费,并返回成功信息到Mezzanine,然后设置STATUS列的is_completed为True。

Trigger框架确保在每个Schemaless实例中的每个单元至少调用bill_rider一次。一般来说只触发trigger函数一次,不过在出错的情况下(无论是trigger功能还是其他功能短暂出错),都可能需要多次调用该函数。也就是说trigger函数是幂等的,在本例中要检查单元是否处理完毕。如果答案为是,则返回函数。

在查看下文中Schemaless如何在流程中提供支持时,要记得这个案例。我们将会解释Schemaless如何被看作变更日志,并讨论与Schemaless相关的API,分享让流程支持可扩展和可容错的技术。

将Schemaless视为日志

Schemaless包含所有单元,也就是说包含指定行键、列keypair的所有版本。由于包含单元的所有历史版本,除了随机访问key-value存储外,Schemaless还可作为变更日志。事实上它就是一个分区日志,每个分片都是自己的日志,如下图:

图片描述

根据行键(也就是UUID)将每个单元写入特定的分片。分片中的所有单元都有唯一标识符,称为添加ID。添加ID是一个自动递增的字段,代表着单元的插入顺序(越新的单元,添加ID的数字越大)。除了添加ID之外,每个单元都有单元写入的时间(datetime)。在所有分片备份中,单元的添加ID是唯一的,这点对于故障时转移非常重要。

Schemaless的API支持随机访问和日志类访问。随机访问API是针对单独的单元,均由row_key、column_key和ref_key一同定义。

Schemaless还包含这些API端点的批处理版本,这里省略。之前说过的trigger函数bill_rider就使用这些函数来获取并操纵单个单元。

对于日志类访问API,我们关心单元的分片数字与时间戳以及添加ID(合称位置location):

图片描述

与随机访问API类似,日志访问API有更多可用的knob,实时从多个分片中抓取单元,不过上面的端点更为重要。位置可以是timestamp或added_id。调用get_cells_for_shard,除了单元之外,还返回下一个添加ID。例如,如果调用位置1000的get_cells_for_shards,请求10个单元,返回的下一个位置偏移是1010。

追踪日志

通过日志类访问API,可以追踪Schemaless实例,就像可以在系统中追踪文件一样(比如tail -f),或者类似最新变更轮询的事件队列(比如Kafka)。然后,客户端持续追踪偏移,并将其用在轮询中。要想引导追踪程序,需要从第一条开始(比如位置0),或从任何时间,或偏移后。
Schemaless trigger通过使用日志类访问API完成相同的追踪,并保持追踪偏移。轮询API的好处直接表现在,通过Schemaless trigger让这个过程具有可扩展性与容错性。通过配置从哪个Schemaless实例、哪一列开始轮询数据,将客户端程序与Schemaless trigger框架链接。使用的函数或回调与框架中的数据流相关,在新单元格插入实例时通过Schemaless trigger或调用或触发。反过来,通过框架在程序所运行的主集群中找到要找的工作进程。框架将工作分到可用进程中,然后通过将分到故障进程的工作分配给其他可用进程,巧妙地解决出现故障的进程。work分配代表着程序员只用编写处理程序(比如trigger函数),并确保它是幂等的。剩下的交给Schemaless trigger来处理。

架构

在这部分中,我们会讨论Schemaless trigger如何扩展,如何将故障影响最小化。下图从较高角度展示了其架构,取自之前的账单结算服务:

图片描述

账单结算服务使用了运行在三台不同主机上的Schemaless trigger,我们(简单起见)假设每个主机只有一个工作进程。Schemaless trigger框架区将分片按工作进程区分开,因此每个工作进程只负责处理一个特定的分片。注意:工作进程1从分片1拉取数据,工作进程2从分片2和分片5拉取数据,工作进程3从分片3和分片4拉取数据。一个工作进程只处理指定分片的单元,抓取新单元、为这些分片调用注册的回调函数。一个工作进程就是指定的leader,负责向工作进程分派片区。如果进程挂起,leader将为故障进程分配的片区重新分配给其他进程。

在一个分片中,单元都是以写入顺序来触发。也就是说如果特定单元的trigger总是由于程序错误而出现故障,就会阻碍该片区的单元处理。为了避免延迟,可以配置Schemaless trigger来标记多次出错的单元,并将它们放在单独的队列中。之后,Schemaless trigger就会继续下一个单元的处理。如果标记单元的数字超过了特定阈值,trigger就会停止。通常代表着系统错误,需要人工修复。

通过存储每个片区中最近一次成功触发单元的添加ID,Schemaless trigge继续保持追踪。该框架将这些偏移保存到共享存储中,比如Zookeeper或Schemaless实例自身,也就是说如果程序重启,trigger就会继续从存储片区的存储偏移开始执行。共享存储也用在meta-info中,比如协调选出leader,探知添加或移除的工作进程。

可扩展性与容错性

Schemaless trigger是为可扩展而设计的。在被追踪的Schemaless实例中,对于任意客户端程序,我们能够添加最多与片区数量一致的工作进程(通常是4096)。此外,我们能够在线添加或移除worker,来独立处理Schemaless实例中其他trigger客户端的变动负载。通过在框架中追踪进度,我们可以为要发送数据的Schemaless实例添加尽可能多的客户端。在服务器端并没有逻辑来持续追踪客户端或者将状态推送过去。

Schemaless trigger也是容错的任何进程故障都可以不影响系统。

  • 如果一个客户端worker的进程出错,leader会将这个work重新分配,确保所有片区都有进程。

  • 如果Schemaless trigger节点上的一个leader出错,会有新的节点被选成leader。在leader选举期间,可以继续处理单元,不过work不能执行重分配工作,也无法移除和添加进程。

  • 如果分片存储(比如ZooKeeper)出错,单元进程持续进行。不过就像在leader选举期间一样,work无法执行重分配工作,而在分片存储出错时进程也无法变更。

  • 最后,在Schemaless实例中,Schemaless trigger框架是不可能出现故障的。任何数据库节点出错都没关系,因为Schemaless trigger可以从备份读取。

总结

从运营角度来看,Schemaless trigger非常好用。由于通过随机访问API或者日志类访问API都可以访问数据,Schemaless是理想的真实数据来源存储库。通过Schemaless trigger最后,我们可以在运行时添加更多的存储服务器,来增加数据容量与性能。如今,Schemaless trigger框架驱动着整个行程处理流,包括分析数据库中的,以及跨数据中心复制后的。我们对未来的前景倍感兴奋。

原文链接:USING TRIGGERS ON SCHEMALESS, UBER ENGINEERING’S DATASTORE USING MYSQL (翻译/孙薇 责编/仲浩)

评论