返回 登录
3

分布式实时处理系统架构设计与机器学习实践

编者按:在2017年的1月11日,CSDN高级架构师金牌授课群为群友们带来了第一次的分享,讲师和主题参见这里,本文为课程后续的文字整理,第一时间发出来分享给读者,课件下载点击这里

大家好,我们今天主要讨论以下几个问题:

  1. 机器学习与实时处理系统应用
  2. 分布式计算拓扑搭建
  3. 消息算法调优
  4. Hurricane计算框架与未来展望

机器学习与实时处理系统应用

现在我们先来看看第一部分:机器学习与实时处理系统应用。我们首先简单了解下机器学习,然后引入分布式实时处理系统的概念以及实时处理系统与机器学的关系。

机器学习在现实世界中的作用越来越重要。

机器学习的方法非常多,比如传统的知识库方法,类比方法,归纳方法,演绎方法等各种方法。

目前在大多数领域中应用最多的当属归纳学习方法。

在通常的归纳型机器学习中,我们的目标是让计算机学习到一个“模型”(这种模型是人类预先组织好的,有固定的数据结构和算法等等),然后我们就可以用这个“模型”来进行“预测”。 预测就是从现实中输入一些数据,通过学习到的模型进行计算,得到的输出。我们希望这个模型可以在很高的概率下输出一个和真实结果差距不大的结果。

一旦我们得到了这个模型,我们可以使用该模型处理输入数据,得到输出数据(即预测结果),而归纳性机器学习的任务就是学习中间的这个模型。

如果我们将这个模型看成一个函数,那么我们可以认为归纳性机器学习的目的就是学习得到一个函数F,如果该函数的参数为x,输出为y。那么我们希望学到的东西就是 y = F(x) 中的F。

我们先用一个最简单的例子来讲一下:

假设我们现在不知道一个物体自由落体速度的计算公式,需要学习如何预测一个物体的自由落体速度,机器学习的第一步就是收集数据,假设我们可以测量出物体下坠的任何时间点的速度,那么我们需要收集的数据就是某个物体的下坠时间和那个时间点的速度。

现在我们收集到一系列数据:

时间 物体速度
1 9.7
2 20.0
3 29.0
4 39.9
5 49.4
6 58.5
7 69.0
8 78.8
9 89.0

我们这里给出两个假设。第一个假设是,一个物体自由落体的速度只和时间有关系 第二个假设是,我们可以使用一个简单的“模型”:一元一次函数得到物体的速度。(即 F(x) = ax + b)

在这个问题中,a、b 这就是这个模型待学习的“参数”。

现在的问题就是——我们需要用什么策略来学习这些参数?因为我们可以遍历的数值空间是无穷大的,因此我们必须采用某种策略指导我们进行学习。我们就用非常朴素的思想来将解决这个问题吧。

在正式学习前,我们先将收集的数据分成两组,一组是“训练数据”,一组是“测试数据” 。

假设训练数据是:

1 9.7
2 20.0
3 29.0
4 39.9
5 49.4
6 58.5

测试数据是:

7 69.0
8 78.8
9 89.0

我们需要根据训练数据计算出我们的参数a和b。 然后使用我们计算出来的a和b预测测试数据,比较F(x)和实际数据的差距,如果误差小到一定程度,说明我们学习到的参数是正确的,比如和实际数据的差距都小于5% 。

如果满足条件说明参数正确,否则说明参数不够精确,需要进一步学习,这个差距,我们称之为误差(Loss)。现在我们来看一下在这个模型(简单的一元一次线性函数)下如何学习这两个参数

比如我们可以采用这种学习策略 1.首先a和b都假定为整数,假定a的范围是[-10, 10]这个区间,b的范围是[-100, 100]这个区间 2.遍历所有的a和b的组合,使用a和b计算ax + b,x取每个训练数据的输入数据,评估计算结果精确性的方法是计算结果和训练数据结果的差的绝对值除以训练数据结果,也就是 Loss = |F(x) - Y| / Y 3.计算每个组合的Loss的平均值,取平均Loss最小的为我们假定的“学习结果” 。

现在我们就得到了a和b,并且这个a和b是在我们给定范围里精度最高的参数,我们用这个a和b去训练数据里面计算平均的 |F(x) - Y| / Y,如果平均Loss小于 5%,说明这个a和b是符合我们精度的。否则我们需要优化我们的学习策略。

这种朴素的基于归纳学习的机器学习方法可以分为以下几步:

  1. 预先定义一个模型
  2. 根据模型制定学习策略
  3. 使用学习策略使用模型来学习(拟合)训练数据,得到该模型中的所有参数
  4. 使用测试数据评估模型是否精确。如果不够精确则根据学习策略继续学习。如果足够精确,我们就认为机器学习结束了。
  5. 最后我们可以得到模型和参数,这就是我们学到的结果,也就是那个用来预测的函数。

这里我们也要注意,上述步骤的前提是我们的模型是可以收敛的,如果模型本身就是发散的,那么我们就永远得不到我们的结果了。

机器学习与实时处理系统

传统的机器学习是一种批处理式的方法,在这种方法下,我们需要预先准备好所有的训练数据,对训练数据进行精心组织和筛选,很多情况下还需要对数据进行标记(监督式学习),而训练数据的组织会对最后的训练结果产生相当大的影响。

在这种算法中我们要处理完所有数据后才能更新权重和模型。

但现在出现了许多在线学习算法,这种算法可以对实时输入的数据进行计算,马上完成权重和模型更新。

一方面我们可以用于监督式学习(完成数据标记后马上加入训练),也可以用于大量数据的非监督式学习。

而在这种情况下,实时处理系统就可以大展身手了。在线系统和实时处理系统可以确保实时完成对数据的学习,利用实时新系统。

实现思路如下图所示:

图片描述

这里我们可以看到,系统接收来自其他系统的实时输入,然后实时处理系统中使用在线算法快速处理数据,实时地更新模型权重信息。

纯粹的在线算法可能并不适合许多情景,但是如果将部分在线算法和传统的批处理式算法结合,将会起到非常好的效果。而且许多数据分析工作确实可以通过这种方式完成一部分处理,至少是预处理。

目前机器学习的趋势就是对精度和速度的要求越来越高,方法越来越复杂,而数据越来越多,计算量越来越大,如果没有足够的计算资源,不一定能够在有限时间内完成足够的学习,因此现在类似于Tensorflow之类的机器学习解决方案都会提供针对分布式的支持。而大数据场景下的机器学习也变得越来越重要,这也对我们的分布式计算与存储方案提出了严峻的挑战。

分布式计算拓扑搭建

现在我们来看一个现实工程中常常会遇到的问题。

我们在开发实际系统时常常会收集大量的用户体验信息,而我们常常需要对这些体验信息进行筛选、处理和分析。那么我们应该如何搭建一个用于实时处理体验信息的分布式系统呢?

我们先来看一下整体流程:

图片描述

  1. 收集体验信息

业务系统调用体验信息接口,将体验信息信息异步写入到特定的文件当中。使用永不停息的体验信息检测程序不断将新生成的体验信息发送到数据处理服务器。

  1. 处理体验信息

首先数据处理服务器的体验信息接收负责将体验信息写入本地的Redis数据库中。然后我们使用消息源从Redis中读取数据,再将数据发送到之后的消息处理单元,由不同的数据处理单元对体验信息进行不同处理。

  1. 存储结果

消息处理单元完成体验信息处理之后,将体验信息处理结果写入到Cassandra数据库中,并将体验信息数据写入到Elasticsearch数据库中。

其中关键的部分就是图中用长方形框出来的部分,该部分的作用是完成对数据的筛选、处理和基本分析。这部分我们将其称作计算拓扑,也就是用于完成实际计算的部分。

我们接下来阐述一下每一步具体如何做。

收集体验信息

图片描述

收集体验信息分为以下几步:

  1. 程序通过体验信息接口将体验信息写入体验信息文件中。我们假设程序会使用非阻塞的异步写入接口,体验信息接口的调用方只是将体验信息送入某个队列中,然后继续向下执行。
  2. 接着体验信息写入线程从消息队列中读取数据,并将体验信息数据写入到真正的体验信息文件中。
  3. 写入后,某一个体验信息代理程序会不断监视体验信息文件的改动,并将用户新写入的体验信息信息发送到体验信息处理服务器的体验信息收集服务接口上。
  4. 体验信息收集服务接口是整个服务的对外接口,负责将其他节点发送的体验信息信息送入集群内部的Redis节点,并将体验信息数据写入到Redis的列表中。至此为止,体验信息收集过程就完成了。

处理体验信息

图片描述

接下来是处理体验信息,处理体验信息主要在计算拓扑中完成。分为四步:

  1. 体验信息处理消息源:负责监视Redis列表的改变,从Redis列表中读取体验信息规则,并将体验信息规则文本转换成计算拓扑的内部数据格式,传送到下一个体验信息处理单元。
  2. 体验信息规则引擎:使用体验信息规则引擎对体验信息进行处理和过滤。这一步是可选的,也就是用户可以加入自己的消息处理单元对收集的体验信息进行处理。这将会影响到发送到后续的消息处理单元(索引器和计数器)中的体验信息消息。这一步我们就不做处理了,如果读者感兴趣可以自己加入一个或者多个消息处理单元对体验信息进行处理。
  3. 索引:这一步必不可少,用于将体验信息规则引擎输出的体验信息写入到ElasticSearch中,并便于用户日后检索这些体验信息。这里涉及到一步——将体验信息规则元组转换成JSON,并将JSON写入ElasticSearch。
  4. 统计:这一步也非常重要,用于对体验信息进行计数,这一步会将体验信息计数结果写入Cassandra的对应表中。便于用户获取统计信息。

存储结果

图片描述

最后就是对计算结果的存储,我们需要使用存储模块将数据写入到不同的数据库中:

  1. ElasticSearch:该数据库用于存储被转换成JSON的原始体验信息信息。用户可以在ElasticSerach中检索体验信息。
  2. Cassandra:该数据库用于存储体验信息的统计计算结果。因为Cassandra支持原子计数列,因此可以非常胜任这个工作。

我们可以发现,在上面几步中,其他都可以使用现成的系统来完成任务,最关键的部分就是计算拓扑,计算拓扑需要高实时性地完成体验信息处理分析任务,这样才能应付大型系统中以极快速度产生的大量体验信息。

这里我们可以使用一个独立的计算集群来完成这个事情。每个计算节点负责完成一个计算任务,完成之后将数据传送给下一个计算节点完成后续的计算任务。每个计算节点都有一个消息队列用于接收来自上一级的消息,然后处理消息并继续将结果发送给下一级的计算节点。

消息处理的问题

这里我们通常关心三个问题:

  1. 如何确保所有数据都得到了处理。
  2. 如何组织消息(数据)的传递,为整个集群高效计算提供一个良好的I/O支持。
  3. 如何搭建这个计算拓扑并尽量高效地进行完成计算。

数据完全处理

我们先来看一下如何解决解决数据的完全处理问题。

我们这里讲每一个需要处理的数据(一条体验信息记录)组织成一个Tuple,也就是元组。每个计算节点都以Tuple为单位进行数据处理。每个元组都会有一个ack方法,用于告知上一级计算节点该Tuple已经处理完成。

我们以下面的方式处理Tuple,保证所有数据都会被完全处理:

  1. 首先给每个Tuple一个id(伪随机的64位id)。
  2. 由消息源发出的Tuple会有一个Acker,构造Tuple的时候会把新的Tuple加入这个Acker(就是包含这个Acker)。
  3. 每个节点处理完一个元组调用元组的ack方法,改变Acker内部的记录值,表示当前Tuple已经完成处理。
  4. 如果某个Acker中的所有Tuple都已经处理完成,那么这个Spout Tuple就已经处理完成。表明该消息源发出的Tuple被完全处理。
  5. 由于我们无法在Acker中记录下Tuple树,因此比较好的方式是实现一个基于异或的优化算法,该算法在Storm中得到了应用。其具体实现是:在Acker中设置一个ack id,每创建一个Tuple,将id与其异或,每ack一个Tuple时,将其与id做异或运算。这样当所有Tuple处理完成后,ack id为0,就可以知道所有元组处理完成。
  6. 如果消息源检测到某个其发出的Tuple没有在特定时间内得到处理,就会重发该元组。后续的计算节点重新开始处理。为了实现一个同时符合CAP的分布式系统,我们这里后续的计算节点并不会缓存计算结果,而是会重新开始计算上一级节点重发的元组,具体为什么这样做请参见How to beat the CAP theorem。

数据流量控制

第二个需要解决的问题就是数据流量控制问题。

我们可以设想一下,如果网络状况不好,在特定时间内有许多元组都没有得到处理,那么数据源节点就会重发许多Tuple,然后后续节点继续进行处理,产生更多的Tuple,加上我们需要正常处理的Tuple,使得集群中的Tuple越来越多。而由于网络状况不好,节点计算速度有限,会导致集群中积累的过多数据拖慢整个集群的计算速度,进一步导致更多的Tuple可能计算失败。

为了解决这个问题,我们必须想方设法控制集群中的流量。

这个时候我们就会采用一种流量背压机制。该机制借鉴自Twitter Heron。
这个机制的思想其实很简单,当每个计算节点处理 Tuple过慢,导致消息队列中挤压的Tuple过多时会向其他节点发送消息,那么所有向该节点发送消息的节点都会降低其发送消息的速度。经过逐级传播慢慢将整个集群的流量控制在比较合理的情况下。只不过这个算法具体如何实现有待我们继续研究。

拓扑结构设计

最后就是如何搭建这个拓扑,并尽量高效地完成计算了。

在分布式实时处理系统领域,目前最为成功的例子就是Apache Storm项目,而Apache Storm采用的就是一种流模型。而我们的Hurricane则借鉴了Storm的结构,并进行了简化(主要在任务和线程模型上)。

这种流模型包括以下几个概念:

  1. 拓扑结构:一个拓扑结构代表一个打包好的实时应用程序,相当于Hadoop中的一个MapReduce任务。但是和MapReduce最大的不同就是,MapReduce最后会停止,相当于任务处理结束,而拓扑结构则会持续执行,永不停息,除非你手动停止。因此任何时刻流入的数据流都会被拓扑结构迅速处理。
  2. 流:一个流是拓扑结构中由元组组成的无限的序列,通常是由一个元组经过不同的处理单元处理之后产生的。每一个流入拓扑结构中的数据都会产生一个流。
  3. 元组:元组是在流中传输的数据,数据源会将输入的数据转成元组输入到拓扑结构中,而数据处理单元会处理上一级的元组并产生新的元组传给下一级的数据处理单元。元组中支持存储不同类型的数据。
  4. 消息源:消息源是拓扑结构中数据流的源头。通常其任务是读取外部数据源输入,并产生元组输入拓扑结构中。可靠的数据源可以确保消息完全得到处理,并在合适的时候重发元组。
  5. 数据处理单元:数据处理单元是拓扑结构中负责处理数据的部分,你可以在其中筛选数据,统计数据,拼接数据等等。
  6. 数据处理单元会接收来自上一级的元组,并经过处理得到下一级的元组。每个数据处理单元会向上一级确认其元组有没有得到正确处理,如果数据源发现固定时间内并不是全部元组都被处理完了,就会重发元组。
    为了支撑这套模型,我们设计了Hurricane的架构,该架构如下图所示:

图片描述

其中有以下几个组件:

  1. 最上方的是President,这是整个集群的管理者,负责存储集群的所有元数据,所有Manager都需要与之通信并受其控制。
  2. 下方的是多个Manager,每个Manager中会包含多个Executor,每个Executor会执行一个任务,可能为Spout和Bolt。

从任务的抽象角度来讲,每个Executor之间会相互传递数据,只不过都需要通过Manager完成数据的传递,Manager会帮助Executor将数据以元组的形式传递给其他的Executor。

Manager之间可以自己传递数据(如果分组策略是确定的),有些情况下还需要通过President来得知自己应该将数据发送到哪个节点中。

了解整体架构后,我们来具体讲解一下President和Manager的架构。

President的架构如图所示:

图片描述

President的底层是一个基于Meshy实现的NetListener,该类负责监听网络,并将请求发送给事件队列,交由事件队列处理。

President的核心是EventQueue。这是一个事件队列,当没有计算任务的时候,会从事件队列中获取事件并进行处理。

用户需要在EventQueue中事先注册每个事件对应的处理函数,President会根据事件类型调用对应的事件处理函数。

图片描述

接下来是Manager的架构。Manager的架构相对来说较为复杂。考虑到性能优化等问题,这个架构修改了几次。

首先,最顶层和President一样,是一个事件队列,并使用一个基于Meshy的NetListener来完成IO事件的响应(转换成事件放入事件队列)。

接下来有两个模块:

  1. Metadata Manager,该线程负责监听EventQueue,接收元数据的同步事件,负责和President同步集群的元数据。
  2. Tuple Dispatcher,该线程负责响应OnTuple事件,接收其他节点发过来的元组,并将元组分发到响应的Bolt Executor的元组队列中。

再下一层就是Executor。Executor分为SpoutExecutor和BoltExecutor,每个Executor都是一个单独的线程,在系统初始化Topology的时候,Managert会初始化Executor,并设置其中的任务。SpoutExecutor负责执行Spout任务,而BoltExecutor负责执行Bolt任务。

其中BoltExecutor需要接受来自其他Executor的Tuple,因此包含一个Tuple Queue。Tuple Dispatcher会将Tuple投送到这个Tuple Queue中,而Bolt则从Tuple Queue中取出数据并执行任务。

Eexecutor在执行完任务后,可能会将Tuple通过OutputCollector投送到OutputQueue中。我们又设计了一个OutputDispatcher,从OutputQueue中获取Tuple并发送到其他节点。OutputQueue也是一个带锁的阻塞队列,是唯一用于输出的队列。

现在我们来详细介绍一下Hurricane的基本组件。

Task

Task是对计算任务的统一抽象,规定了计算任务的统一接口。Spout和Bolt都是Task的特殊实现。

Task包含三个接口函数:

  1. Prepare,用于完成任务对象的初始化。
  2. Cleanup,用于清理任务对象。Executor会在关闭任务时执行该函数。
  3. DeclareFields,该方法用于声明该任务输出的Tuple的字段列表。

Spout

Spout是Task的特例,任务用于产生待处理的元组。因此除了Task的接口以外,还增加了两个新接口。

  1. Clone,用于生成Spout对象的拷贝。
  2. NextTuple,Executor会反复调用该函数生成元组。

这里需要注意一下,Hurricane会有简单的背压机制,当Bolt检测到Tuple流量过大的时候,会向Spout进行反馈,Spout会随之降低其发送元组的速度。

如果Bolt处理速度大于Tuple的生成速度,Bolt又会向Spout反馈增加流量,Spout会放松流量限制。

Bolt

Bolt是计算单元,负责处理来自其他Spout和Bolt的元组。Bolt同样是特殊的Task,因此除了Task接口外,还有两个新的接口。

  1. Clone:用于拷贝生成新的Bolt对象
  2. Execute:用于处理来自其他任务的元组,Executor每拿到一个元组就会调用该函数进行处理。

Bolt接收元组的方式我们称之为分组方式。目前hurricane支持3种分组方式。

  1. Global:在该分组方式中,Executor会在第一次发送元组的时候随机选择一个目标任务,并记录下该任务,之后都使用该目标任务。
  2. Random:在该分组方式中,Executor每次都会重新随机选择一个目标任务。
  3. Field:在改分组方式中,Executor每次都会根据元组的特定字段选择目标任务,如果没有记录则会向President询问其目标节点,如果有记录则直接发送。

Executor

Executor负责执行具体的任务。每个Executor是一个独立的线程,可以充分利用多核和多线程的CPU。为了简化模型,每个Executor只负责执行一个任务。

  1. SpoutExecutor负责执行Spout任务,会持续不断地执行Spout。
  2. BoltExecutor负责执行Bolt任务,该Executor会不断从TupleQueue中获取元组,并调用Bolt处理元组。

如果TupleQueue中不包含元组,BoltExecutor会被阻塞。超过一定时间没有获取到元组,BoltExecutor会向Spout反馈,解除部分流量限制,加快元组生成速度。

其他特性

Squared

图片描述

介绍完Hurricane的基本功能与架构之后,我们来介绍一下Squared。

首先我们解释一下Squared是什么?

图片描述

左侧是Hurricane基本的计算模型,在该计算模型中,系统是一个计算任务组成的网络。我们需要考虑每个节点的琐屑实现。

但如果在日常任务中,使用这种模型相对来说会显得比较复杂,尤其当网络非常复杂的时候。

为了解决这个问题,看一下右边这个计算模型,这是对我们完成计算任务的再次抽象。

  1. 第一步是产生语句的数据源
  2. 然后每条语句需要使用名为SplitSentence的函数处理将句子划分为单词
  3. 接下来根据单词分组,使用CountWord这个统计操作完成单词的计数。

这里其实是将网络映射成了简单的数据操作流程。这样一来,解决问题和讨论问题都会变得更为简单直观。

这就是Squared所做的事情——将基于网络与数据流的模型转换成这种简单的流模型,让开发者更关注于数据的统计分析,脱离部分繁琐的工作。

保序

在现实的工作中,我们常常需要一个的特性就是保序。

比如部分银行交易和部分电商订单处理,希望数据按照顺序进行处理,但是传统的数据处理系统往往不支持这个特性。所以我们就实现了保序功能。

保序的实现原理很简单,首先每个Tuple会一个一个orderId字段,orderId是依据顺序生成的,然后所有对Tuple的操作都会检验该orderId之前的Tuple是否已经完成。

如果已经完成则处理该Tuple,否则就将Tuple放在一个队列里,等待前面的Tuple处理完毕为止。

多语言支持

最后一部分就是多语言支持。

毋庸置疑的是,一个庞大复杂的实际系统不可能整个系统都使用C++编写。

首先就是C++的入门门槛高,平均开发效率无法和其他语言相比。其次,现在大部分的Web应用都是使用Java或者脚本语言开发,因此我们必须考虑Hurricane的多语言接口问题。

为此,Hurricane的思想是以基本的C++为后端,然后在C++上面封装其他语言的接口。此外还提供Bolt和Spout的实现接口,让其他语言可以直接编写计算组件。
当用户希望使用其他语言快速实现部分新的算法和模型的时候,这种特性就会非常有用。

展望

目前该框架已经能够处理日常所需工作。该框架会在之后的时间中继续完善架构,完善并优化我们的系统实现,比如完全实现高层抽象Squared和保序机制等。

除此以外,由于现在有许多计算任务需要使用基于向量和矩阵的浮点计算,因此我们计划开发一个Hurricane的子项目——SewedBLAS。这是一个BLAS库的高层抽象,我们希望整合大量的BLAS库,比如使用CPU的MKL/OpenBLAS,使用GPU的CUDA和ACML,构建一个易于使用、跨平台的高性能线性代数库,并与Hurricane进行深度整合,力求在分布式和科学计算、深度学习找到最好的切合点,并充分吸收整合其他现有的分布式机器学习框架,减少从科研到产品的转换难度。


编辑推荐:架构技术实践系列文章(部分):

评论