返回 登录
0

Ray:为人工智能而生的分布式执行框架

原文:Ray: A Distributed Execution Framework for AI Applications
作者:Robert Nishihara
翻译:黑色巧克力

译者注:文章介绍了服务人工智能的开源框架Ray,并借助代码示例说明了它的特点和优势。

Ray,一个在集群和大型多核机器上高效运行Python代码的框架。可以查看相关代码文档

许多人工智能算法在计算上都非常密集,并且显示出复杂的通信模式。为此许多研究人员将大部分时间花在构建定制系统上,以高效地在集群中分发代码。

然而,定制的系统通常是基于特定的单一算法或算法类。因此我们构建了Ray来帮助消除一堆冗余的工程任务,这些任务目前在每个新算法中反复出现。我们希望能够重用一些基本的基础元素来实现并高效地执行各种算法和应用程序。

现有代码的简单并行化

Ray允许通过最少的修改来远程执行Python函数。

使用常规的Python时,在函数未执行完毕之前,如果调用函数,那么会被阻塞。下面这个例子的执行时间为8秒:

  def f():
    time.sleep(1)
  # Calls to f executed serially.
results = []
for _ in range(8):
    result = f()
    results.append(result)

对于Ray,当调用远程函数时会立即返回一个future(称之为对象IDs)。接着创建一个任务,然后调度它,并在集群的某个地方去执行。下面的例子只需要1秒即可完成。

@ray.remote
def f():
    time.sleep(1)

# Tasks executed in parallel.
results = []
for _ in range(8):
    result = f.remote()
    results.append(result)

results = ray.get(results)

注意,惟一的变化是将@ray.remote装饰器添加到了函数定义中,通过f.remote调用该函数,并且在对象IDs列表中调用ray.get(记住对象IDs是未来),以便阻塞进程直到相应的任务执行完成。


image

这是描述任务和对象的图表。圆圈表示任务,而方框表示对象。8个单独的任务之间没有箭头,表示所有的任务可以并行执行。

任务依赖的灵活编码

与MapReduce或Apache Spark这样的批量同步并行框架相比,Ray的设计目的是支持需要细粒度任务依赖的人工智能应用程序。它与整个数据集的总统计量的计算不同,训练过程可以对一小部分数据进行操作,也可以对少数任务的输出进行操作。

依赖项可以通过将对象IDs(任务的输出)传递到其他任务来进行编码。

import numpy as np

@ray.remote
def aggregate_data(x, y):
    return x + y

data = [np.random.normal(size=1000) for i in range(4)]

while len(data) > 1:
  intermediate_result = aggregate_data.remote(data[0], data[1])
  data = data[2:] + [intermediate_result]

result = ray.get(data[0])

通过将一些调用的输出传递给aggregate_data,随后调用aggregate_data,然后对这些任务之间的依赖进行编码,这些任务可以由系统使用,从而制定调度决策,并协调对象的传输。注意,当将对象IDs传递到远程函数调用时,实际的值将在函数执行之前被解压,因此当执行aggregate_data函数时,x和y将是numpy数组。


image

这是描述任务和对象的图表。圆圈表示任务,而方框表示对象。箭头表明从任务到它们产生的对象或者从对象到依赖于它们的任务。

与参与者共享可变状态

Ray使用参与者在任务之间共享可变状态。下面的例子中多个任务共享Atari模拟器状态。每个任务通过运行模拟器完成前面任务所遗留下的几个步骤。

import gym

@ray.remote
class Simulator(object):
    def __init__(self):
        self.env = gym.make("Pong-v0")
        self.env.reset()

    def step(self, action):
        return self.env.step(action)

# Create a simulator, this will start a new worker that will run all
# methods for this actor.
simulator = Simulator.remote()

observations = []
for _ in range(4):
    # Take action 0 in the simulator.
    observations.append(simulator.step.remote(0))

每次调用simulator.step.remote生成一个在参与者上调度的任务。这些任务会改变模拟器对象的状态,并且每次执行一个。

与远程函数一样,参与者的方法返回对象IDs(也就是future),这些对象可以被传递到其他任务中,并且可以用ray.get来检索它们的值。


image

这是描述任务和对象的图表。圆圈表示任务,而方框表示对象。第一个任务是参与者的构造函数。粗箭头用于显示在这个参与者上调用的方法共享参与者的底层状态。

等待完成任务的子集

有时,当运行带有变量持续时间的任务时,不希望等待所有的任务完成。相反,可能希望等待一半的任务完成,或者使用完一秒后完成的任务。

@ray.remote
def f():
    time.sleep(np.random.uniform(0, 5))

# Launch 10 tasks with variable durations.
results = [f.remote() for _ in range(10)]

# Wait until either five tasks have completed or two seconds have passed and
# return a list of the object IDs whose tasks have finished.
ready_ids, remaining_ids = ray.wait(results, num_returns=5, timeout=2000)

在本例中,ready_ids是一个对象IDs列表,它对应的任务已完成执行,而remaining_ids是剩余的对象IDs列表。

这段原始代码使实现其他行为变得很容易,例如希望按照完成的顺序来处理某些任务。

# Launch 10 tasks with variable durations.
remaining_ids = [f.remote() for _ in range(10)]

# Process the tasks in the order that they complete.
results = []
while len(remaining_ids) > 0:
    ready_ids, remaining_ids = ray.wait(remaining_ids, num_returns=1)
    results.append(ray.get(ready_ids[0]))

注意,可以简单的修改上面的示例,以便上一个任务完成时自适应地启动新任务。

使用Apache Arrow高效共享内存和序列化

序列化和反序列化数据常常是分布式计算的瓶颈。Ray让同一机器上的工作进程通过共享内存访问相同的对象。为了达到这一目的,Ray把内存中的对象存储在每台机器上为对象服务。

为了说明问题,假设我们创建了一些神经网络权重,并希望将它们从一个Python进程传送到另一个Python进程。

import numpy as np

weights = {"Variable{}".format(i): np.random.normal(size=5000000)
           for i in range(10)}  # 2.68s

为了将神经网络的权重传输到周围,首先需要将它们序列化成一个连续的字节块。这可以通过像pickle这样的标准序列化库来完成。

import pickle

# Serialize the weights with pickle. Then deserialize them.
pickled_weights = pickle.dumps(weights)      # 0.986s
new_weights = pickle.loads(pickled_weights)  # 0.241s

反序列化所需的时间尤为重要,因为机器学习最常见的一种模式是在单一的过程中聚集大量的值(例如神经网络权重,转出,或其他值),因此连续的反序列化步骤可能发生数百次。

为了减少在共享内存中对象进行反序列化所需要的时间,我们使用Apache Arrow数据布局。这使我们能够在不扫描整个blob的情况下,计算对序列化的blob的偏移量。在实践中还可以转换成反序列化,达到几个数量级的速度。

# Serialize the weights and copy them into the object store. Then deserialize
# them.
weights_id = ray.put(weights)      # 0.525s
new_weights = ray.get(weights_id)  # 0.000622s

使用Arrow调用ray.put序列化权重,并将结果复制到对象存储的内存中。然后调用ray.get对序列化的对象进行反序列化,并构造一个新的numpy字典数组。然而支持numpy数组的底层数组仍然存在于共享内存中,并且不会被复制到Python进程的堆中。

注意如果一台不同的机器调用ray.get,那么相关的序列化对象将从一台常用机器上复制到另一台需要的机器上。

这个例子中,我们指明调用ray.put。然而通常情况下,只有当将Python对象传递到远程函数或从远程函数返回时,这个调用才会发生。

期待您的反馈

该项目处于早期阶段,如果你愿意尝试运用,我们非常乐意听到你的想法和建议。

评论