pytorch分布式训练

pytorch 分布式解决方案尝试:

  • nn.Dataparallel 简单解决方案:适合于单节点多 GPU 并行
  • nn.parallel.DistributedDataParallel 并行 + torch.utils.data.distributed.DistributedSampler 数据 + torch.distributed 配置
  • horovod: Uber 开源外部解决方案

典型的两种方式: Map-reduce 和 Ring all reduce

几种通信方式:

方式一: nn.DataParallel

​ nn.DataParallel 为单进程,多线程。使用起来比较简单,相应的效率也较低, 适用于单节点多 GPU 的情况。nn.DataParallel 会将模型复制到不同的 GPU 上。 对于每个训练批次 batch, 会将其切分到不同的 GPU 上进行正向传播并将梯度汇总(默认是第一个 GPU)。其函数原型为:

class torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)
# device_ids: 指定参与训练的 GPU
#  output_device: 用于梯度汇总的 GPU

注意点:

(1)效率低下的原因:在每个训练批次(batch)中,会将梯度汇总然后分发到每个 GPU 上,所以网络通信就成为了一个瓶颈,并且会带来显存利用不平衡的问题。

(2)使用 nn.DataParallel 会在原有的模型外边包装一个 module, 加载模型的时候需要进行适当的拆包。

(2) 当没有指定 device_ids 的时候,程序会自动找到这个机器上面可以用的所有的显卡, 然后用于训练。可以通过:

os.environ['CUDA_VISIBLE_DEVICES'] 事先指定可用 GPU, 防止 GPU 冲突。

方式二:torch.nn.parallel.DistributedDataParallel

torch.nn.parallel.DistributedDataParallel 是 pytorch 推荐的并行方案。它支持多机多卡训练,显存分配更加均衡,且效率较高。

Step1: 环境配置

确定网络的接口, 通常会自己寻找网络接口,当没有自己寻找到的时候,需要手动进行配置:

import os
os.environ['NCCL_SOCKET_IFNAME'] = 'enp2s0'   # for NCCL
os.environ['GLOO_SOCKET_IFNAME'] = 'enp2s0'   # for GLOO
step2: 确定相关配置
  • 后端: 后端支持 gloo、nccl 和 mpi 三种方式。在使用 CPU 进行分布式训练的时候,优先选择 gloo, 在使用 GPU 进行分布式训练的时候, 优先选择 nccl。

    NCCL:NCCL 的全称为 NVIDIA Collective Communications Library ,是一个可以实现多个 GPU 、多个结点间聚合通信的库,在 PCIe、Nvlink、InfiniBand 上可以实现较高的通信速度。NCCL 对 GPU 均有较好支持,且 torch.distributed 对其也提供了原生支持。

    MPI

    MPI:即消息传递接口 (Message Passing Interface),是一个来自于高性能计算领域的标准的工具。它支持点对点通信以及集体通信,并且是 torch.distributed 的 API 的灵感来源。MPI 后端的优势在于,在大型计算机集群上,MPI 应用广泛,且高度优化。 但是,torch.distributed 对 MPI 并不提供原生支持。因此,要使用 MPI,必须从源码编译 Pytorch。是否支持 GPU,视安装的 MPI 版本而定。

    Gloo:Gloo后端支持 CPU 和 GPU,其支持集体通信(collective Communication),并对其进行了优化。torch.distributed 对 gloo 提供原生支持,无需进行额外操作。

  • init_method: 初始化 init_method 的方法有两种,一种是使用 TCP 进行初始化,另外一种是使用 共享文件系统 进行初始化。并不推荐使用共享文件系统进行初始化。 TCP 初始化的格式为:tcp://ip:端口号

  • rankworld_size

    你需要确保, 不同机器的 rank 值不同, 并且需要注意: 主机的rank必须为0,而且使用 init_method的 ip 一定是 rank为 0 的主机,world_size 是你的主机数量, 不能随便设置这个数值, 当参与训练的主机数量达到 world_size 的设置值时, 代码才会执行的.

parser = argparse.ArgumentParser()
parser.add_argument('-bk',
                    '--backend', type=str, default='nccl', help='Name of the backend to use.')
parser.add_argument('-im',
                    '--init-method',
                    type=str,
                    default='env://',
                    help='URL specifying how to initialize the package.')
parser.add_argument('-ws', '--world-size', type=int, default=1, help='Number of processes participating in the job.')
parser.add_argument('-r', '--rank', type=int, default=0, help='Rank of the current process.')
args = parser.parse_args()

distributed.init_process_group(
    backend=args.backend,
    init_method=args.init_method,
    world_size=args.world_size,
    rank=args.rank,
)
step 3: 加载数据,进行训练
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)

model = nn.parallel.DistributedDataParallel(model)

# ...
step4: 在命令行执行如下命令:

机器1:

python3 -m torch.distributed.launch torch_ddp.py  -bk nccl -im tcp://10.10.10.1:12345 -rn 0 -ws 2

机器2:

python3 -m torch.distributed.launch torch_ddp.py  -bk nccl -im tcp://10.10.10.1:12345 -rn 1 -ws 2

方式三: Horovod

Horovod 是 Uber 开源的跨平台的分布式训练工具,名字来自于俄国传统民间舞蹈,舞者手牵手围成一个圈跳舞,与 Horovod 设备之间的通信模式很像,有以下几个特点:

  1. 兼容 TensorFlow、Keras 和 PyTorch 机器学习框架。
  2. 使用 Ring-AllReduce 算法,对比 Parameter Server 算法,有着无需等待,负载均衡的优点。
  3. 实现简单,容易上手。(划重点)
step1: 简单设置
import torch
import horovod.torch as hvd

# init horovod
hvd.init()
# 给当前进程分配对应的 gpu,local_rank() 返回的是当前是第几个进程
torch.cuda.set_device(hvd.local_rank())
step2: datasets、model、optimizer
# dataset ...
train_dataset = ...
train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)

# model ...
model = ...
model.cuda()

# optimizer ...
optimizer = optim.SGD(model.parameters())
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
step3: 训练
# 初始化的时候广播参数,这个是为了在一开始的时候同步各个 gpu 之间的参数
hvd.broadcast_parameters(model.state_dict(), root_rank=0)

for epoch in range(100):
   for batch_idx, (data, target) in enumerate(train_loader):
       optimizer.zero_grad()
       output = model(data)
       loss = F.nll_loss(output, target)
       loss.backward()
       optimizer.step()
       if batch_idx % args.log_interval == 0:
           print('Train Epoch: {} [{}/{}]\tLoss: {}'.format(
               epoch, batch_idx * len(data), len(train_sampler), loss.item()))

方式四: 混合精度 apex 搭配分布式

这样可以取得比较低的显存占比,同时训练时间也大幅缩减。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!