pytorch分布式训练
pytorch 分布式解决方案尝试:
- nn.Dataparallel 简单解决方案:适合于单节点多 GPU 并行
- nn.parallel.DistributedDataParallel 并行 + torch.utils.data.distributed.DistributedSampler 数据 + torch.distributed 配置
- horovod: Uber 开源外部解决方案
典型的两种方式: Map-reduce 和 Ring all reduce
几种通信方式:
方式一: nn.DataParallel
nn.DataParallel 为单进程,多线程。使用起来比较简单,相应的效率也较低, 适用于单节点多 GPU 的情况。nn.DataParallel 会将模型复制到不同的 GPU 上。 对于每个训练批次 batch, 会将其切分到不同的 GPU 上进行正向传播并将梯度汇总(默认是第一个 GPU)。其函数原型为:
class torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)
# device_ids: 指定参与训练的 GPU
# output_device: 用于梯度汇总的 GPU
注意点:
(1)效率低下的原因:在每个训练批次(batch)中,会将梯度汇总然后分发到每个 GPU 上,所以网络通信就成为了一个瓶颈,并且会带来显存利用不平衡的问题。
(2)使用 nn.DataParallel 会在原有的模型外边包装一个 module, 加载模型的时候需要进行适当的拆包。
(2) 当没有指定 device_ids 的时候,程序会自动找到这个机器上面可以用的所有的显卡, 然后用于训练。可以通过:
os.environ['CUDA_VISIBLE_DEVICES']
事先指定可用 GPU, 防止 GPU 冲突。
方式二:torch.nn.parallel.DistributedDataParallel
torch.nn.parallel.DistributedDataParallel 是 pytorch 推荐的并行方案。它支持多机多卡训练,显存分配更加均衡,且效率较高。
Step1: 环境配置
确定网络的接口, 通常会自己寻找网络接口,当没有自己寻找到的时候,需要手动进行配置:
import os
os.environ['NCCL_SOCKET_IFNAME'] = 'enp2s0' # for NCCL
os.environ['GLOO_SOCKET_IFNAME'] = 'enp2s0' # for GLOO
step2: 确定相关配置
后端: 后端支持 gloo、nccl 和 mpi 三种方式。在使用 CPU 进行分布式训练的时候,优先选择 gloo, 在使用 GPU 进行分布式训练的时候, 优先选择 nccl。
NCCL:NCCL 的全称为 NVIDIA Collective Communications Library ,是一个可以实现多个 GPU 、多个结点间聚合通信的库,在 PCIe、Nvlink、InfiniBand 上可以实现较高的通信速度。NCCL 对 GPU 均有较好支持,且 torch.distributed 对其也提供了原生支持。
MPI
MPI:即消息传递接口 (Message Passing Interface),是一个来自于高性能计算领域的标准的工具。它支持点对点通信以及集体通信,并且是 torch.distributed 的 API 的灵感来源。MPI 后端的优势在于,在大型计算机集群上,MPI 应用广泛,且高度优化。 但是,torch.distributed 对 MPI 并不提供原生支持。因此,要使用 MPI,必须从源码编译 Pytorch。是否支持 GPU,视安装的 MPI 版本而定。
Gloo:Gloo后端支持 CPU 和 GPU,其支持集体通信(collective Communication),并对其进行了优化。torch.distributed 对 gloo 提供原生支持,无需进行额外操作。
init_method: 初始化
init_method
的方法有两种,一种是使用 TCP 进行初始化,另外一种是使用 共享文件系统 进行初始化。并不推荐使用共享文件系统进行初始化。 TCP 初始化的格式为:tcp://ip:端口号
rank 和 world_size
你需要确保, 不同机器的
rank
值不同, 并且需要注意: 主机的rank
必须为0,而且使用init_method
的 ip 一定是rank
为 0 的主机,world_size
是你的主机数量, 不能随便设置这个数值, 当参与训练的主机数量达到world_size
的设置值时, 代码才会执行的.
parser = argparse.ArgumentParser()
parser.add_argument('-bk',
'--backend', type=str, default='nccl', help='Name of the backend to use.')
parser.add_argument('-im',
'--init-method',
type=str,
default='env://',
help='URL specifying how to initialize the package.')
parser.add_argument('-ws', '--world-size', type=int, default=1, help='Number of processes participating in the job.')
parser.add_argument('-r', '--rank', type=int, default=0, help='Rank of the current process.')
args = parser.parse_args()
distributed.init_process_group(
backend=args.backend,
init_method=args.init_method,
world_size=args.world_size,
rank=args.rank,
)
step 3: 加载数据,进行训练
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
model = nn.parallel.DistributedDataParallel(model)
# ...
step4: 在命令行执行如下命令:
机器1:
python3 -m torch.distributed.launch torch_ddp.py -bk nccl -im tcp://10.10.10.1:12345 -rn 0 -ws 2
机器2:
python3 -m torch.distributed.launch torch_ddp.py -bk nccl -im tcp://10.10.10.1:12345 -rn 1 -ws 2
方式三: Horovod
Horovod 是 Uber 开源的跨平台的分布式训练工具,名字来自于俄国传统民间舞蹈,舞者手牵手围成一个圈跳舞,与 Horovod 设备之间的通信模式很像,有以下几个特点:
- 兼容 TensorFlow、Keras 和 PyTorch 机器学习框架。
- 使用 Ring-AllReduce 算法,对比 Parameter Server 算法,有着无需等待,负载均衡的优点。
- 实现简单,容易上手。(划重点)
step1: 简单设置
import torch
import horovod.torch as hvd
# init horovod
hvd.init()
# 给当前进程分配对应的 gpu,local_rank() 返回的是当前是第几个进程
torch.cuda.set_device(hvd.local_rank())
step2: datasets、model、optimizer
# dataset ...
train_dataset = ...
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
# model ...
model = ...
model.cuda()
# optimizer ...
optimizer = optim.SGD(model.parameters())
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
step3: 训练
# 初始化的时候广播参数,这个是为了在一开始的时候同步各个 gpu 之间的参数
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{}]\tLoss: {}'.format(
epoch, batch_idx * len(data), len(train_sampler), loss.item()))
方式四: 混合精度 apex 搭配分布式
这样可以取得比较低的显存占比,同时训练时间也大幅缩减。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!