Pytorch
Dataset
next(iter(dataloader))返回一个batch的数据 , 等价于IterableDataset
可以用 pytorch IterableDataset + python generator function(yield) 来解决,这样按需读取数据,常用的 dataset 是一次全部加载。 IterableDataset 因为按需读取,就没法用 len, index 等功能
Pytorch提供了两种思路去构建
Dataset
:Map式数据集: 将整个数据集读取到内存中,通过index映射的方式读取对应的数据,优点速度快,缺点占用内存,大的数据集是无法使用。
Iterable式数据集:无需将整个数据集读取到内存中,通过覆盖写
iter
迭代器的方式实现流的形式输入数据。无需满足内存大于整个数据集,也无需知道全部数据的大小。
简单来说,如果数据集较小时推荐尽量使用Map式Dataset,数据量过大、数据量未知、训练内存无法满足时只能使用Iterable
式构建Dataset。
- iterable Dataset 在分布式训练
在分布式时训练中数据并行的时,每块GPU都有一个独立的model和独立的进程(DDP模式)去训练完整数据的子集,在Pytorch中的DDP模式是通过DistributedSampler()
去实现在分布式并行训练时每个模型读取是整个数据集上不同部分,从而避免训练时取数据发生重复。比较坑的是:DistributedSampler()主要通过切片的方式将整个数据集分成独立的子集。Map 式 Dataset无需再多写代码直接可以适用。而iterable式 Dataset是无法切片。
注意,如果它是一个带有一些随机性的 torch.utils.data.IterableDataset
,并且你是以分布式方式进行训练的,你的 iterable dataset
要么使用一个内部的 attribute generator
,该generator
是一个 torch.Generator
用于随机化,且在所有进程上必须是相同的(并且 Trainer
将在每个 epoch
手动设置该 generator
的种子);要么有一个 set_epoch()
方法,在该方法内部设置所用随机数生成器的种子。
- pytorch DistributedSampler的实现
def __iter__(self) -> Iterator[T_co]:
if self.shuffle:
# deterministically shuffle based on epoch and seed
g = torch.Generator()
g.manual_seed(self.seed + self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist() # type: ignore
else:
indices = list(range(len(self.dataset))) # type: ignore
from torch.utils.data import IterableDataset
class IterableDataset(IterableDataset):
def __init__(self, file_path):
self.file_path = file_path
def __iter__(self):
with open(self.file_path, "r") as f:
for line in f.readlines():
items = line.split("\t")
# process data
.......
yield {"input1": input1, "input2": input2, "input3": input3}
数据并行:因为求导以及加和都是线性的,数据并行在数学上也有效
- 一个dataset划分为若干个batch
- 每个GPU复制一份模型
- 将每个batch平均分配到所有GPU上并行运算
DP
- 使用
model = nn.DataParallel(model)
DataParallel(DP)
适用单机,不适用多机
优点:一行代码即可
缺点
- 负载不均衡,即存在主次模型(主模型需要整合其它次模型的梯度进行参数更新),主模型负载更大;
- 通信开销大
- 单进程多线程
- Global Interpreter Lock (GIL)全局解释器锁:一个 Python 进程只能利用一个 CPU kernel,即单核多线程并发时,只能执行一个线程。考虑多核,多核多线程可能出现线程颠簸 (thrashing) 造成资源浪费,所以 Python 想要利用多核最好是多进程
- 过程( 比如device[0]为主模型,其它为次模型)
- 过程一(图中红色部分):各卡分别计算损失和梯度
- 过程二(图中蓝色部分):所有梯度整合到 device[0]
- 过程三(图中绿色部分):device[0] 进行参数更新,其他卡拉取 device[0] 的参数进行更新
- 采用PS架构(Parameter Server)
- DP源码
class DataParallel(Module):
def __init__(self, module, device_ids=None, output_device=None, dim=0):
super(DataParallel, self).__init__()
# 检查是否有可用的 GPU
device_type = _get_available_device_type()
if device_type is None:
self.module = module
self.device_ids = []
return
# 默认使用所有可见的 GPU
if device_ids is None:
device_ids = _get_all_device_indices()
# 默认 server 是 device_ids 列表上第一个
if output_device is None:
output_device = device_ids[0]
self.dim = dim
self.module = module
self.device_ids = list(map(lambda x: _get_device_index(x, True), device_ids))
self.output_device = _get_device_index(output_device, True)
self.src_device_obj = torch.device(device_type, self.device_ids[0])
# 检查负载是否平衡, 不平衡(指内存或者处理器 max/min > 0.75 会有警告)
_check_balance(self.device_ids)
# 单卡
if len(self.device_ids) == 1:
self.module.to(self.src_device_obj)
def forward(self, *inputs, **kwargs):
# 没 GPU 可用
if not self.device_ids:
return self.module(*inputs, **kwargs)
# 运行前 GPU device_ids[0] (即我们的 server )上必须有 parallelized module 的parameters 和 buffers
# 因为 DP 保证 GPU device_ids[0] 和 base parallelized module 共享存储
# 所以在device[0] 上的 in-place 更新也会被保留下来,其他的则不会
for t in chain(self.module.parameters(), self.module.buffers()):
if t.device != self.src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
"on device {} (device_ids[0]) but found one of "
"them on device: {}".format(self.src_device_obj, t.device))
# nice 现在 device[0] 上已经有了 module 和 input, 接下来我们就要开始 PS 算法了
# 可以开始看正文了
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
# 如果仅有单卡可用,直接单卡计算,不用并行
if len(self.device_ids) == 1:
return self.module(*inputs[0], **kwargs[0])
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
outputs = self.parallel_apply(replicas, inputs, kwargs)
return self.gather(outputs, self.output_device)
def replicate(self, module, device_ids):
return replicate(module, device_ids, not torch.is_grad_enabled())
def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
def parallel_apply(self, replicas, inputs, kwargs):
return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
def gather(self, outputs, output_device):
return gather(outputs, output_device, dim=self.dim)
DDP
基本概念
group: 进程组,一般就需要一个默认的
world size: 所有的进程数量
rank: 全局的进程id
local rank:某个节点上的进程id
local_word_size: 某个节点上的进程数 (相对比较少见)
假设所有进程数即 world_size为W,每个节点上的进程数即local_world_size为L,则每个进程上的两个ID:
- rank的取值范围:[0, W-1],rank=0的进程为主进程,会负责一些同步分发的工作
- local_rank的取值:[0, L-1]
- 假定有2个机器或者节点,每个机器上有4块GPU。图中一共有4个进程,即world_size=4,那这样每个进程占用两块GPU,其中rank就是[0,1,2,3],每个节点的local_rank就是[0,1]了(4个进程,2个节点,平均每个节点2个进程),其中local_world_size 也就是2。 这里需要注意的是,local_rank是隐式参数,即torch自动分配的。比如local_rank可以通过自动注入命令行参数或者环境变量来获得)** 。
- 通常–nproc_per_node等于显卡数,也就是一个显卡分配一个进程
import torch.distributed as dist
import argparse, os
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=ine, default=0)
args = parser.parse_args()
dist.init_process_group("nccl")
rank = dist.get_rank()
local_rank_arg = args.local_rank # 命令行形式ARGS形式
local_rank_env = int(os.environ['LOCAL_RANK']) # 在利用env初始ENV环境变量形式
local_world_size = int(os.environ['LOCAL_WORLD_SIZE'])
print(f"{rank=}; {local_rank_arg=}; {local_rank_env=}; {local_world_size=}")
$ python3 -m torch.distributed.launch --nproc_per_node=4 test.py
rank=2; local_rank_arg=2; local_rank_env=2, local_world_size=4
rank=0; local_rank_arg=0; local_rank_env=0, local_world_size=4
rank=3; local_rank_arg=3; local_rank_env=3, local_world_size=4
rank=1; local_rank_arg=1; local_rank_env=1, local_world_size=4
初始化
torch.distributed.init_process_group(backend, init_method=None, world_size=-1, rank=-1, store=None,...)
backend
- torch提供了
NCCL, GLOO,MPI
三种可用的后端 - CPU的分布式训练选择
GLOO
, GPU的分布式训练就用NCCL
即可
init_method
- 显式指定
init_method
,可以是TCP连接、File共享文件系统、ENV环境变量三种方式 - 显式指定
store
,同时指定world_size 和 rank参数。这里的store是一种分布式中核心的key-value存储,用于不同的进程间共享信息。
这两种方法是互斥的,其实本质上第一种方式是对第二种的一个更高的封装,最后都要落到store上进行实现。如果这两种方法都没有使用,默认使用init_method='env'
的方式来初始化。
三种init_method
:
init_method='tcp://ip:port'
: 通过指定rank 0(即:MASTER进程)的IP和端口,各个进程进行信息交换。 需指定 rank 和 world_size 这两个参数。init_method='file://path'
:通过所有进程都可以访问共享文件系统来进行信息共享。需要指定rank和world_size参数。init_method=env://
:从环境变量中读取分布式的信息(os.environ),主要包括MASTER_ADDR, MASTER_PORT, RANK, WORLD_SIZE
。 其中,rank和world_size可以选择手动指定,否则从环境变量读取。
运行方法
torch.multiprocessing
(python的multiprocessing
的封装类)
mp.spawn(fn, args=(), nprocs=1, join=True, daemon=False)
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def fn(rank, ws, nums):
dist.init_process_group('nccl', init_method='tcp://127.0.0.1:28765',
rank=rank, world_size=ws)
rank = dist.get_rank()
print(f"rank = {rank} is initialized")
torch.cuda.set_device(rank)
tensor = torch.tensor(nums).cuda()
print(tensor)
if __name__ == "__main__":
ws = 2
mp.spawn(fn, nprocs=ws, args=(ws, [1, 2, 3, 4]))
rank = 0 is initialized
rank = 1 is initialized
tensor([1, 2, 3, 4], device='cuda:1')
tensor([1, 2, 3, 4], device='cuda:0')
2. launch
$ python3 -m torch.distributed.launch --配置 train.py --args参数
# 通过外部命令运行
# 通过CUDA_VISIBLE_DEVICES控制可见的卡数
# 通过--nproc_per_node确定使用多少卡
CUDA_VISIBLE_DEVICES="0,1,2,3" python -m torch.distributed.run --nproc_per_node 4 train.py
常用配置有:
--nnodes: 使用的机器数量,单机的话,就默认是1了
--nproc_per_node: 单机的进程数,即单机的worldsize
--master_addr/port: 使用的主进程rank0的地址和端口
--node_rank: 当前的进程rank
在单机情况下, 只有--nproc_per_node 是必须指定的,--master_addr/port和node_rank都是可以由launch通过环境自动配置
- run
$ torchrun --nproc_per_node=4 train.py
命令torchrun
来代替torch.distributed.launch
- 完全使用环境变量配置各类参数,如
RANK,LOCAL_RANK, WORLD_SIZE
等,尤其是local_rank
不再支持用命令行隐式传递的方式 - 能够更加优雅的处理某个worker失败的情况,重启worker。需要代码中有
load_checkpoint(path)
和save_checkpoint(path)
这样有worker失败的话,可以通过load最新的模型,重启所有的worker接着训练 - 训练的节点数目可以弹性变化
模型训练
步骤
初始化进程组
dist.init_process_group
设置分布式采样器
DistributedSampler
使用
DistributedDataParallel
封装模型使用
torchrun
或者mp.spawn
启动分布式训练
1 分布式训练数据加载
- Dataloader需要把所有数据分成N份(N为worldsize), 并能正确的分发到不同的进程中,每个进程可以拿到一个数据的子集,不重叠,不交叉。
torch.utils.data.distributed.DistributedSampler(dataset,
num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False)
- dataset: 需要加载的完整数据集
- num_replicas: 把数据集分成多少份,默认是当前dist的world_size
- rank: 当前进程的id,默认从dist的rank
- shuffle:是否打乱
- drop_last: 如果数据长度不能被world_size整除,可以考虑是否将剩下的扔掉
- seed:随机数种子。这里需要注意,从源码中可以看出,真正的种子其实是
self.seed+self.epoch
这样的好处是,不同的epoch每个进程拿到的数据是不一样,因此需要在每个epoch开始前设置下:sampler.set_epoch(epoch)
其实Sampler的实现也很简单,核心代码就一句:
indices[self.rank: self.total_size: self.num_replicas]
假设4卡12条数据的话,rank=0,1,2,3, num_replicas=4, 那么每个卡取的数据索引就是:
rank0: [0 4 8]; rank1: [1 5 9]; rank2: [2 6 10]; rank3: [3 7 11]
保证不重复不交叉。这样在分布式训练的时候,只需要给Dataloader指定DistributedSampler即可,简单示例如下:
sampler = DistributedSampler(dataset)
loader = DataLoader(dataset, sampler=sampler)
for epoch in range(start_epoch, n_epochs):
sampler.set_epoch(epoch) # 设置epoch 更新种子
train(loader)
- if args.local_rank == -1 表示关闭分布式
train_sampler = RandomSampler(train_dataset) if args.local_rank == -1 else DistributedSampler(train_dataset)
2 模型的分布式训练封装
torch.cuda.set_device(local_rank)
model = Model().cuda()
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])
# 要调用model内的函数或者属性. model.module.xxxx
3 模型保存与加载
- 训练时候保存模型,只保存rank=0主进程模型,不需要dist.barrior(), all_reduce 操作保证了同步性
# 后面正常训练代码
optimizer = xxx
for epoch:
train_sampler.set_epoch(epoch)
for data in Dataloader:
model(data)
xxx
# 训练完成 只需要保存rank 0上的即可
# 不需要dist.barrior(), all_reduce 操作保证了同步性
if rank == 0:
torch.save(model.module.state_dict(), CHECKPOINT_PATH)
# 保存的是参数,不需要DDP包裹
torch.save(model.module.state_dict())
- 推理时候加载模型,需要barrier()其他保证rank 0保存完成。
model = DistributedDataParallel(model, device_ids=[local_rank])
CHECKPOINT_PATH ="./model.checkpoint"
if rank == 0: # 主进程
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# barrier()其他保证rank 0保存完成
dist.barrier()
map_location = {"cuda:0": f"cuda:{local_rank}"}
model.load_state_dict(torch.load(CHECKPOINT_PATH, map_location=map_location))
这样在多卡训练时,每个进程有一个model副本和optimizer,使用自己的数据进行训练,之后反向传播计算完梯度的时候,所有进程的梯度会进行all-reduce操作进行同步,进而保证每个卡上的模型更新梯度是一样的,模型参数也是一致的。
这里有一个需要注意的地方,在save和load模型时候,为了减小所有进程同时读写磁盘,一般处理方法是以主进程为主,rank0先save模型,在map到其他进程。这样的另外一个好处,在最开始训练时,模型随机初始化之后,保证了所有进程的模型参数保持一致。
4 损失函数
- loss.backward() 不变
- 如果计算loss数值,用下面的all_reduce。或者也可以不用
# 仍然可以直接调用模型的train()方法
# 但是假如要调用其他你自己写的方法,就得model.module.func()
model.train()
for data in dataloader:
loss = model(data)
optimizer.zero_grad()
loss.backward() # 这个操作自动同步梯度
optimizer.step()
# 但是仍然需要累加得到所有进程loss的值的和
dist.all_reduce(loss, op=dist.ReduceOp.SUM)
# 然后除以并行数,就是这个batch的loss值了
loss /= world_size
模型推理
一般需要先所有进程的输出结果进行gather,再进行指标的计算,两个常用的函数:
dist.all_gather(tensor_list, tensor)
: 将所有进程的tensor进行收集并拼接成新的tensorlist返回,比如:dist.all_reduce(tensor, op)
这是对tensor
的in-place的操作, 对所有进程的某个tensor进行合并操作,op可以是求和等:- 拿到所有进程中模型的输出,最后统一计算指标
pred_list = []
for data in Dataloader:
pred = model(data)
batch_pred = [torch.zeros_like(label) for _ in range(world_size)]
dist.all_gather(batch_pred, pred)
pred_list.extend(batch_pred)
pred_list = torch.cat(pred_list, 1)
# 所有进程pred_list是一致的,保存所有数据模型预测的值
def test(model, test_loader, device):
model.eval()
preds = []
labels = []
with torch.no_grad():
for data in test_loader:
inputs, truth = data
inputs = inputs.to(device)
truth = truth.to(device)
output = model(**inputs)['logits']
predict = torch.max(output.data, 1)[1]
cur_preds = [torch.ones_like(predict) for _ in range(dist.get_world_size())]
cur_truth = [torch.ones_like(truth) for _ in range(dist.get_world_size())]
dist.all_gather(cur_preds, predict)
dist.all_gather(cur_truth, truth)
preds.extend(cur_preds)
labels.extend(cur_truth)
model.train()
predict = torch.cat(preds, 0)
labels = torch.cat(labels, 0)
correct = (predict == labels).sum().item()
return correct * 1.0 / len(predict)
注意事项
- 要把模型和数据放在进程对应的那张卡上
- 要使用Sampler来分发训练数据,并且shuffle不设置在Dataloder中而是Sampler中,每个epoch还需要调用Sampler的
set_epoch()
方法。(需要set_epoch来使用sampleer中shuffle,否则不是随机的) - 训练和验证区分较大,验证一般在主进程中进行一次验证即可,不需要sampler,操作和单卡一样,之后将数据同步给其他进程。
- 在多卡时要调用模型的其他方法或者使用单卡的模式,需要用
model.module
来获得原始模型,同样保存参数时也保存的是model.module
的参数而不是DDP包裹的。
使用总结
import argparse
import torch
from torch.nn.parallel import DistributedDataParallel as DDP
parser = argparse.ArgumentParser()
parser.add_argument("--save_dir", default='')
parser.add_argument("--local_rank", default=-1)
parser.add_argument("--world_size", default=1)
args = parser.parse_args()
# 初始化后端 建议NCCL
# world_size 指的是总的并行进程数目
# 比如16张卡单卡单进程 就是 16
# 但是如果是8卡单进程 就是 1
# 等到连接的进程数等于world_size,程序才会继续运行
torch.distributed.init_process_group(backend='nccl',
world_size=ws,
init_method='env://')
torch.cuda.set_device(args.local_rank)
device = torch.device(f'cuda:{args.local_rank}')
model = nn.Linear(2,3).to(device)
# train dataset
# train_sampler
# train_loader
my_trainset = torchvision.datasets.CIFAR10(root='./data', train=True)
# 新增1:使用DistributedSampler,DDP帮我们把细节都封装起来了。用,就完事儿!
# sampler的原理,后面也会介绍。
train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)
# 需要注意的是,这里的batch_size指的是每个进程下的batch_size。也就是说,总batch_size是这里的batch_size再乘以并行数(world_size)。
trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=batch_size, sampler=train_sampler)
# 初始化 DDP,这里我们通过规定 device_id 用了单卡单进程
# 实际上根据我们前面对 parallel_apply 的解读,DDP 也支持一个进程控制多个线程利用多卡
model = DDP(model,
device_ids=[args.local_rank],
output_device=args.local_rank).to(device)
for epoch in range(epochs):
trainloader.sampler.set_epoch(epoch)
# 后面这部分,则与原来完全一致了。
for data, label in trainloader:
prediction = model(data)
loss = loss_fn(prediction, label)
loss.backward()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.step()
# 保存模型
if torch.distributed.get_rank() == 0:
torch.save(model.module.state_dict(),
'results/%s/model.pth' % args.save_dir)
区别
- 多进程
和 DP 不同, DDP 采用多进程,最推荐的做法是每张卡一个进程从而避免上一节所说单进程带来的影响。前文也提到了 DP 和 DDP 共用一个 parallel_apply 函数,所以 DDP 同样支持单进程多线程多卡操作,自然也支持多进程多线程,不过需要注意一下 world_size。 - 通信效率
DP 的通信成本随着 GPU 数量线性增长,而 DDP 支持 Ring AllReduce,其通信成本是恒定的,与 GPU 数量无关。 - 同步参数
DP 通过收集梯度到 device[0],在device[0] 更新参数,然后其他设备复制 device[0] 的参数实现各个模型同步;
DDP 通过保证初始状态相同并且改变量也相同(指同步梯度) ,保证模型同步。
- 多进程
源码
class DistributedDataParallel(Module): def __init__(self, module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False): super(DistributedDataParallel, self).__init__() assert any((p.requires_grad for p in module.parameters())), ( "DistributedDataParallel is not needed when a module " "doesn't have any parameter that requires a gradient." ) self.is_multi_device_module = len({p.device for p in module.parameters()}) > 1 distinct_device_types = {p.device.type for p in module.parameters()} assert len(distinct_device_types) == 1, ( "DistributedDataParallel's input module must be on " "the same type of devices, but input module parameters locate in {}." ).format(distinct_device_types) self.device_type = list(distinct_device_types)[0] if self.device_type == "cpu" or self.is_multi_device_module: assert not device_ids and not output_device, ( "DistributedDataParallel device_ids and output_device arguments " "only work with single-device GPU modules, but got " "device_ids {}, output_device {}, and module parameters {}." ).format(device_ids, output_device, {p.device for p in module.parameters()}) self.device_ids = None self.output_device = None else: # Use all devices by default for single-device GPU modules if device_ids is None: device_ids = _get_all_device_indices() self.device_ids = list(map(lambda x: _get_device_index(x, True), device_ids)) if output_device is None: output_device = device_ids[0] self.output_device = _get_device_index(output_device, True) if process_group is None: self.process_group = _get_default_group() else: self.process_group = process_group self.dim = dim self.module = module self.device = list(self.module.parameters())[0].device self.broadcast_buffers = broadcast_buffers self.find_unused_parameters = find_unused_parameters self.require_backward_grad_sync = True self.require_forward_param_sync = True self.ddp_join_enabled = False self.gradient_as_bucket_view = gradient_as_bucket_view if check_reduction: # This argument is no longer used since the reducer # will ensure reduction completes even if some parameters # do not receive gradients. warnings.warn( "The `check_reduction` argument in `DistributedDataParallel` " "module is deprecated. Please avoid using it." ) pass # used for intra-node param sync and inter-node sync as well self.broadcast_bucket_size = int(250 * 1024 * 1024) # # reduction bucket size self.bucket_bytes_cap = int(bucket_cap_mb * 1024 * 1024) # 保证初始状态一样 # Sync params and buffers self._sync_params_and_buffers(authoritative_rank=0) # 下拉看源码 self._ddp_init_helper() def _sync_params_and_buffers(self, authoritative_rank=0): module_states = list(self.module.state_dict().values()) if len(module_states) > 0: self._distributed_broadcast_coalesced( module_states, self.broadcast_bucket_size, authoritative_rank) def _ddp_init_helper(self): """ Initialization helper function that does the following: (1) replicating the module from device[0] to the other devices (前文提到 DDP 也支持一个进程多线程利用多卡,类似 DP ,这时候就会用到第一步) (2) bucketing the parameters for reductions (把 parameter 分组,梯度通讯时,先得到梯度的会通讯) (3) resetting the bucketing states (4) registering the grad hooks (创建管理器) (5) passing a handle of DDP to SyncBatchNorm Layer (为 SyncBN 准备) """ def parameters(m, recurse=True): def model_parameters(m): ps = m._former_parameters.values() \ if hasattr(m, "_former_parameters") \ else m.parameters(recurse=False) for p in ps: yield p for m in m.modules() if recurse else [m]: for p in model_parameters(m): yield p if self.device_ids and len(self.device_ids) > 1: warnings.warn( "Single-Process Multi-GPU is not the recommended mode for " "DDP. In this mode, each DDP instance operates on multiple " "devices and creates multiple module replicas within one " "process. The overhead of scatter/gather and GIL contention " "in every forward pass can slow down training. " "Please consider using one DDP instance per device or per " "module replica by explicitly setting device_ids or " "CUDA_VISIBLE_DEVICES. " ) # only create replicas for single-device CUDA modules # # TODO: we don't need to replicate params in here. they're always going to # be broadcasted using larger blocks in broadcast_coalesced, so it might be # better to not pollute the caches with these small blocks self._module_copies = replicate(self.module, self.device_ids, detach=True) self._module_copies[0] = self.module for module_copy in self._module_copies[1:]: for param, copy_param in zip(self.module.parameters(), parameters(module_copy)): # Reducer requires param copies have the same strides across replicas. # Fixes up copy_param strides in case replicate didn't match param strides. if param.layout is torch.strided and param.stride() != copy_param.stride(): with torch.no_grad(): copy_param.set_(copy_param.clone() .as_strided(param.size(), param.stride()) .copy_(copy_param)) copy_param.requires_grad = param.requires_grad else: self._module_copies = [self.module] self.modules_params = [list(parameters(m)) for m in self._module_copies] self.modules_buffers = [list(m.buffers()) for m in self._module_copies] # Build tuple of (module, parameter) for all parameters that require grads. modules_and_parameters = [ [ (module, parameter) for module in replica.modules() for parameter in filter( lambda parameter: parameter.requires_grad, parameters(module, recurse=False)) ] for replica in self._module_copies] # Build list of parameters. parameters = [ list(parameter for _, parameter in replica) for replica in modules_and_parameters] # Checks if a module will produce a sparse gradient. def produces_sparse_gradient(module): if isinstance(module, torch.nn.Embedding): return module.sparse if isinstance(module, torch.nn.EmbeddingBag): return module.sparse return False # Build list of booleans indicating whether or not to expect sparse # gradients for the corresponding parameters. expect_sparse_gradient = [ list(produces_sparse_gradient(module) for module, _ in replica) for replica in modules_and_parameters] # The bucket size limit is specified in the constructor. # Additionally, we allow for a single small bucket for parameters # that are defined first, such that their gradients don't spill into # a much larger bucket, adding unnecessary latency after gradient # computation finishes. Experiments showed 1MB is a reasonable value. bucket_indices = dist._compute_bucket_assignment_by_size( parameters[0], [dist._DEFAULT_FIRST_BUCKET_BYTES, self.bucket_bytes_cap], expect_sparse_gradient[0]) # Note: reverse list of buckets because we want to approximate the # order in which their gradients are produced, and assume they # are used in the forward pass in the order they are defined. # 管理器 self.reducer = dist.Reducer( parameters, list(reversed(bucket_indices)), self.process_group, expect_sparse_gradient, self.bucket_bytes_cap, self.find_unused_parameters, self.gradient_as_bucket_view) # passing a handle to torch.nn.SyncBatchNorm layer self._passing_sync_batchnorm_handle(self._module_copies)
完整样例
# -*- coding: utf-8 -*-
import os
import torch
import torch.optim as optim
import torch.distributed as dist
from transformers import BertForSequenceClassification
from torch.utils.data.distributed import DistributedSampler
from utils import bert_name, collate_fn, load_data_and_labels, Data
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
import time
epochs = 15
lr = 2e-5
batch_size = 64
def train():
# dist init
dist.init_process_group(backend='nccl', init_method='env://')
rank = dist.get_rank()
size = dist.get_world_size()
local_rank = int(os.environ['LOCAL_RANK'])
device = torch.device(f"cuda:{local_rank}")
torch.cuda.set_device(device)
# dataset
x_text, y = load_data_and_labels("./data/rt-polarity.pos", "./data/rt-polarity.neg")
x_train, x_test, y_train, y_test = train_test_split(x_text, y, test_size=0.1)
train_data = Data(x_train, y_train)
test_data = Data(x_test, y_test)
train_sampler = DistributedSampler(train_data)
train_loader = DataLoader(train_data, batch_size=batch_size*size, collate_fn=collate_fn, sampler=train_sampler)
test_loader = DataLoader(test_data, batch_size=batch_size*size, shuffle=False,
collate_fn=collate_fn, sampler=DistributedSampler(test_data))
# model
model = BertForSequenceClassification.from_pretrained(bert_name, num_labels=2)
model.to(device)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])
optimizer = optim.Adam(model.parameters(), lr=lr*size)
best_acc = -0.1
print(f"rank {rank} start training...")
for epoch in range(1, epochs):
total_loss = 0.0
train_sampler.set_epoch(epoch)
model.train()
start_time = time.time()
for step, batch_data in enumerate(train_loader):
inputs, labels = batch_data
inputs = inputs.to(device)
labels = labels.to(device)
optimizer.zero_grad()
output = model(**inputs, labels=labels)
loss = output[0]
loss.backward()
optimizer.step()
total_loss += loss.item()
end_time = time.time()
acc = test(model, test_loader, device)
if acc > best_acc:
best_acc = acc
if rank == 0:
print(f"\t Epoch{epoch}: loss: {total_loss:.4f}, acc: {acc:.4f}, time: {(end_time - start_time):.2f}s")
if rank == 0:
print("*"*20)
print(f"finished; best acc: {best_acc:.4f}")
def test(model, test_loader, device):
model.eval()
preds = []
labels = []
with torch.no_grad():
for data in test_loader:
inputs, truth = data
inputs = inputs.to(device)
truth = truth.to(device)
output = model(**inputs)['logits']
predict = torch.max(output.data, 1)[1]
cur_preds = [torch.ones_like(predict) for _ in range(dist.get_world_size())]
cur_truth = [torch.ones_like(truth) for _ in range(dist.get_world_size())]
dist.all_gather(cur_preds, predict)
dist.all_gather(cur_truth, truth)
preds.extend(cur_preds)
labels.extend(cur_truth)
model.train()
predict = torch.cat(preds, 0)
labels = torch.cat(labels, 0)
correct = (predict == labels).sum().item()
return correct * 1.0 / len(predict)
if __name__ == '__main__':
train()
DeepSpeed
DeepSpeed is a deep learning optimization library that makes distributed training and inference easy, efficient, and effective.
Innovation pillars
Training
Argument Parsing
- 将
argparse
变成deepspeed parser
parser = deepspeed.add_config_arguments(parser)
- 样例
import argparse
import deepspeed
def add_argument():
parser=argparse.ArgumentParser(description='CIFAR')
#data
# cuda
parser.add_argument('--with_cuda', default=False, action='store_true',
help='use CPU in case there\'s no GPU support')
parser.add_argument('--use_ema', default=False, action='store_true',
help='whether use exponential moving average')
# train
parser.add_argument('-b', '--batch_size', default=32, type=int,
help='mini-batch size (default: 32)')
parser.add_argument('-e', '--epochs', default=30, type=int,
help='number of total epochs (default: 30)')
parser.add_argument('--local_rank', type=int, default=-1,
help='local rank passed from distributed launcher')
# Include DeepSpeed configuration arguments
parser = deepspeed.add_config_arguments(parser)
args=parser.parse_args()
return args
Initialization
- 必须把
args, model结构和参数
变成deepspeed版本 - 对于
dataloder
可以用deepspeed.initialize
变成分布式(需要传入trainset
),也可以自定义(不需要传入trainset
) - API
def initialize(args,
model,
optimizer=None,
model_parameters=None,
training_data=None,
lr_scheduler=None,
mpu=None,
dist_init_required=True,
collate_fn=None):
- 样例
parameters = filter(lambda p: p.requires_grad, net.parameters())
args=add_argument()
# Initialize DeepSpeed to use the following features
# 1) Distributed model
# 2) Distributed data loader
# 3) DeepSpeed optimizer
# 需要传入`trainset`
model_engine, optimizer, trainloader, _ = deepspeed.initialize(args=args, model=net, model_parameters=parameters, training_data=trainset)
# 不需要传入`trainset`
model_engine, optimizer, _, _ = deepspeed.initialize(args=args, model=net, model_parameters=parameters, training_data=None)
DataLoder
- 因为deepspeed基于pytorch.dist,所以可以用dataloader,注意自定义dataloder需要分布式采样
DistributedSampler
- 自定义分布式dataloder,注意batch_size定义,分布式的batch_size是指每张gpu上批大小,不开启分布式batch_size指所有gpu的总大小
# 开启分布式 用DistributedSampler
if local_rank >= 0:
if data_sampler is None:
data_sampler = DistributedSampler(dataset)
device_count = 1
# 不开启分布式 用 RandomSampler
else:
if data_sampler is None:
data_sampler = RandomSampler(dataset)
device_count = torch.cuda.device_count()
batch_size *= device_count
self.dataloader = DataLoader(self.dataset,
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
num_workers=self.num_local_io_workers)
- 最好定义成生成器(相当于
IterableDataset
)
def __iter__(self):
self._create_dataloader()
return self
def __len__(self):
return self.len
def __next__(self):
if self.tput_timer:
self.tput_timer.start()
return next(self.data)
- 样例
import torch
import logging
from torch.utils.data import DataLoader, RandomSampler
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
class DeepSpeedDataLoader(object):
def __init__(self,
dataset,
batch_size,
pin_memory,
local_rank,
tput_timer,
collate_fn=None,
num_local_io_workers=None,
data_sampler=None):
self.tput_timer = tput_timer
self.batch_size = batch_size
if local_rank >= 0: # 开启分布式 用DistributedSampler
if data_sampler is None:
data_sampler = DistributedSampler(dataset)
device_count = 1
else: # 不开启分布式 用 RandomSampler
if data_sampler is None:
data_sampler = RandomSampler(dataset)
device_count = torch.cuda.device_count()
batch_size *= device_count
if num_local_io_workers is None:
num_local_io_workers = 2 * device_count
self.num_local_io_workers = num_local_io_workers
self.data_sampler = data_sampler
self.dataset = dataset
self.collate_fn = collate_fn
self.device_count = device_count
self.batch_size = batch_size
self.pin_memory = pin_memory
self.len = len(self.data_sampler)
self.data = None
def __iter__(self):
self._create_dataloader()
return self
def __len__(self):
return self.len
def __next__(self):
if self.tput_timer:
self.tput_timer.start()
return next(self.data)
def _create_dataloader(self):
if self.collate_fn is None:
self.dataloader = DataLoader(self.dataset,
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
num_workers=self.num_local_io_workers)
else:
self.dataloader = DataLoader(self.dataset,
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
collate_fn=self.collate_fn,
num_workers=self.num_local_io_workers)
self.data = (x for x in self.dataloader)
return self.dataloader
Training API
- 把读取的分布式
data
放到相应的显卡上inputs, labels = data[0].to(model_engine.local_rank), data[1].to(model_engine.local_rank)
- 注意
loss
和optimizer
分别是model_engine.backward(loss)
和model_engine.step()
,不需要optimizer.zero_grad()
. (Zeroing the gradients is handled automatically by DeepSpeed after the weights have been updated using a mini-batch.) - 样例
for i, data in enumerate(trainloader):
# get the inputs; data is a list of [inputs, labels]
inputs, labels = data[0].to(model_engine.local_rank), data[1].to(model_engine.local_rank)
outputs = model_engine(inputs)
loss = criterion(outputs, labels)
model_engine.backward(loss)
model_engine.step()
Configuration
- 定义
JSON file (ds_config.json)
{
"train_batch_size": 4,
"steps_per_print": 2000,
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.001,
"betas": [
0.8,
0.999
],
"eps": 1e-8,
"weight_decay": 3e-7
}
},
"scheduler": {
"type": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": 0.001,
"warmup_num_steps": 1000
}
},
"wall_clock_breakdown": false
}
Running
$ deepspeed deepspeed.py --deepspeed --deepspeed_config ds_config.json
Inference
Inference API
deepspeed.init_inference()
returns an inference engine of typeInferenceEngine
.
for step, batch in enumerate(data_loader):
#forward() method
loss = engine(batch)
Accelerator
使用
import accelerate accelerator = accelerate.Accelerator() device = accelerator.device #获取当前进程的设备 ... # 进行封装 model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader) #训练时 loss.backward() 换为: accelerator.backward(loss)
accelerator.print
:仅仅在主进程输出accelerator.process_index
: 当前进程ID,没有使用rank命名,而是用的process_index来表示accelerator.is_local_main_process/is_main_processs:
: 是否local_rank 或则rank为0, 主进程accelerator.wait_for_everyone()
: 类似 dist.barrier() , 等所有进程到达这一步。accelerator.save
: 保存模型
Horovod
import horovod.torch as hvd
# 初始化
hvd.init()
# Samapler
# *此处num_replicas=hvd.size(), rank=hvd.rank()必须*
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
# 优化器包装
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
# 模型分发广播
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
# 模型训练不需要修改
$ horovodrun -np 4 -H localhost:4 python3 train.py