0. 前言


1. Setup

  • torch.distributedtorch.multiprocessing
    • 两者应该都提供了方法,可以令任意两个进程之间进行通信。
    • 前者支持不同的backend,因此支持不同机器上进程的通信。
  • 为了展示功能,首先要建立多进程运行环境
    • 下面的代码创建了两个进程的分布式运行环境。
    • 其中,init_process 确保每个进程都与同一个master进行协作。
    • coordination tool 是啥?距离说有pdsh/clustershell/slurm
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process

def run(rank, size):
    """ Distributed function to be implemented later. """
    pass

def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)

if __name__ == "__main__":
    size = 2
    processes = []
    for rank in range(size):
        p = Process(target=init_process, args=(rank, size, run))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

2. 点对点通信

  • 所谓点到点通信,指的是数据从一个进程传输到另一个进程。
    • image_1eel2trk747m1h1c17fv6fn9089.png-15.2kB
  • 实现主要有blocking与non-blocking两种实现.
    • 前者指的是,两个进程都在通信完成之前,都会阻塞。
    • 后者指的是,两个进程在调用完isend/irecv后直接结束,但返回一个Work对象,可以执行worker.wait()方法实现类似阻塞的功能。
  • non-blocking 方式要非常注意
    • worker.wait() 执行完成之前,不应该向对应的tensor执行操作。
    • 在调用了 dist.isend() 后再向对应的 tensor 写入数据会导致未知错误。
    • 在调用了 dist.irecv() 后再想对应的 tensor 读取数据会导致未知问题。
  • 点对点通信适合用于进程间通信的精细控制。
"""Blocking point-to-point communication."""
def run(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        dist.send(tensor=tensor, dst=1)
    else:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
    print('Rank ', rank, ' has data ', tensor[0])

"""Non-blocking point-to-point communication."""
def run(rank, size):
    tensor = torch.zeros(1)
    req = None
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        req = dist.isend(tensor=tensor, dst=1)
        print('Rank 0 started sending')
    else:
        # Receive tensor from process 0
        req = dist.irecv(tensor=tensor, src=0)
        print('Rank 1 started receiving')
    req.wait()
    print('Rank ', rank, ' has data ', tensor[0])

3. Collective Communication

  • 所谓 Collective Communication,指的是集体通信,运行一组进程间的相互通信。形式有很多种,如下图中的 scatter/gather/reduce/all-reduce/broadcast/all-gather
    • image_1eel39eo71a0fkgv1scd1dcq1b8d1t.png-153.6kB
  • 组(Group)
    • 所谓一组(group),就是所有进程的子集。
    • 要建立group,可以通过 dist.new_group(group) 来实现。
  • 默认情况下,collectives是在所有进程间进行通信,也就是所谓的 world
    • 例如,为了获取所有进程中所有tensor的和,就可以调用 dist.all_reduce(tensor, op, group)
""" All-Reduce example."""
def run(rank, size):
    """ Simple point-to-point communication. """
    group = dist.new_group([0, 1])
    tensor = torch.ones(1)
    dist.all_reduce(tensor, op=dist.reduce_op.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor[0])
  • 支持的操作有
    • dist.broadcast(tensor, src, group): Copies tensor from src to all other processes.
    • dist.reduce(tensor, dst, op, group): Applies op to all tensor and stores the result in dst.
    • dist.all_reduce(tensor, op, group): Same as reduce, but the result is stored in all processes.
    • dist.scatter(tensor, src, scatter_list, group): Copies the ith tensor scatter_list[i] to the ith process.
    • dist.gather(tensor, dst, gather_list, group): Copies tensor from all processes in dst.
    • dist.all_gather(tensor_list, tensor, group): Copies tensor from all processes to tensor_list, on all processes.
    • dist.barrier(group): block all processes in group until each one has entered this function.
  • reduce支持的op有
    • dist.reduce_op.SUM
    • dist.reduce_op.PRODUCT
    • dist.reduce_op.MAX
    • dist.reduce_op.MIN

4. 分布式训练

  • 目标就是实现类似 DistributedDataParallel 的功能。
    • 主要思路就是:将输入数据拆分为若干个split分别传送给每个进程(GPU),分别执行前向与反向操作,之后将反向获得的梯度汇总求平均得到最终梯度值,并将梯度值传递给每个进程分别进行参数更新。
  • 下面的代码实现对数据集进行拆分,并给出了MNIST的实例
""" Dataset partitioning helper """
class Partition(object):
    def __init__(self, data, index):
        self.data = data
        self.index = index
    def __len__(self):
        return len(self.index)
    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]

class DataPartitioner(object):
    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)
        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]
    def use(self, partition):
        return Partition(self.data, self.partitions[partition])

""" Partitioning MNIST """
def partition_dataset():
    dataset = datasets.MNIST('./data', train=True, download=True,
                             transform=transforms.Compose([
                                 transforms.ToTensor(),
                                 transforms.Normalize((0.1307,), (0.3081,))
                             ]))
    size = dist.get_world_size()
    bsz = 128 / float(size)
    partition_sizes = [1.0 / size for _ in range(size)]
    partition = DataPartitioner(dataset, partition_sizes)
    partition = partition.use(dist.get_rank())
    train_set = torch.utils.data.DataLoader(partition,
                                         batch_size=bsz,
                                         shuffle=True)
    return train_set, bsz
  • 实现分布式训练,包括了前向、反向、平均梯度等操作
""" Distributed Synchronous SGD Example """
def run(rank, size):
    torch.manual_seed(1234)
    train_set, bsz = partition_dataset()
    model = Net()
    optimizer = optim.SGD(model.parameters(),
                          lr=0.01, momentum=0.5)

    num_batches = ceil(len(train_set.dataset) / float(bsz))
    for epoch in range(10):
        epoch_loss = 0.0
        for data, target in train_set:
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            loss.backward()
            average_gradients(model)
            optimizer.step()
        print('Rank ', dist.get_rank(), ', epoch ',
              epoch, ': ', epoch_loss / num_batches)

""" Gradient averaging. """
def average_gradients(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)
        param.grad.data /= size
  • 实现 Ring-Allreduce
""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
   rank = dist.get_rank()
   size = dist.get_world_size()
   send_buff = send.clone()
   recv_buff = send.clone()
   accum = send.clone()

   left = ((rank - 1) + size) % size
   right = (rank + 1) % size

   for i in range(size - 1):
       if i % 2 == 0:
           # Send send_buff
           send_req = dist.isend(send_buff, right)
           dist.recv(recv_buff, left)
           accum[:] += recv_buff[:]
       else:
           # Send recv_buff
           send_req = dist.isend(recv_buff, right)
           dist.recv(send_buff, left)
           accum[:] += send_buff[:]
       send_req.wait()
   recv[:] = accum[:]

5. 进阶内容

5.1. 通信后端

  • 了解更多GPU间通信的方法,如MPI/Gloo
  • torch.distributed 的一个非常优雅的地方在于,建立了抽象,可以使用不同的backends。
  • 常用的后端有三种:Gloo/NCCL/MPI,具体对比在这里有详细描述
  • Gloo
    • 官方Github
    • 使用非常方便,因为在pytorch的二进制包中就包括了Gloo。
    • 支持CPU中的点到点通信以及集体通信,GPU中的集体通信。
    • 但CUDA tensors的集体通信并没有像NCCL那样被优化过。
  • MPI
    • torch.distributed 包重要就是根据MPI设计的。
    • MPI的实现有很多,入Open-MPI/MVAPICH2/Intel MPI等,都为了不同的目的设计。
    • 使用MPI的主要优势在于MPI本身有通用性。
    • 但是,PyTorch二进制包中并没有MPI实现,所以需要自己bian’yi
  • NCCL
    • 提供了一种GPU Tensors之间集体通信的优化实现。
    • 如果只使用CUDA Tensor之间的集体通信,那NCCL作为backend是最合适的。

5.2. 初始化方法

  • 学习如何设置 dist.init_process_group() 方法的参数。

  • 相关环境变量(设置下面四个环境变量后,所有进程都能与master进行通信了,从而获取其他进程的信息,实现相互通信)

    • MASTER_PORT:rank 0 指定的端口
    • MASTER_ADDR:rank 0(或者说master)的IP地址
    • WORLD_SIZE:进程数量(也可以说是GPU数量?)
    • RANK:每个进程的rank(编号?),从而判断他们是不是某个worker的master。
  • 共享文件系统

    • 要求所有进程都有某个共享文件系统的权限,通过共享文件协调工作。
    • 也就是说,每个进程都可以打开文件,写入自身信息,等待进程执行同样的操作。当所有进程都执行完毕后,此时信息就实现了共享。
    • 为了防止资源竞争,可以使用fcntl来实现文件锁。
  • TCP

    • 通过提供rank0(master)的ip地址与可用端口,就可以实现TCP通信。
    • 所有workers都会连接到rank0进程,交换数据,然后传递给别的进程。
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐