鹤啸九天 自律更自由,平凡不平庸 Less is More

分布式训练

2024-03-05
鹤啸九天
阅读量

Notes(温馨提示):

  1. ★ 首次阅读建议浏览:导航指南, 或划到本页末尾, 或直接点击跳转, 查看全站导航图
  2. 右上角工具条搜索文章,右下角二维码关注微信公众号(鹤啸九天),底栏分享、赞赏、评论
  3. ★ 转载请注明文章来源,知识点积累起来不容易,水滴石穿,绳锯木断,谢谢理解
  4. ★ 如有疑问,邮件讨论,欢迎贡献优质资料


分布式

【2021-10-13】OpenAI 研究员最新博客:如何在多GPU上训练真正的大模型?原文链接

  • 单个GPU卡的内存有限,许多大模型的大小已经超过了单个GPU,训练深且大的神经网络的主要方法有训练并行加速、各种模型架构以及内存节省设计等。
    • (1)并行加速方法有以下几种:
      • 数据并行性:将相同的模型权重复制到多个worker中,并将一部分数据分配给每个worker以同时进行处理。
      • 模型并行性
      • 流水线并行
      • 张量并行
    • (2)模型架构方面主要有专家混合(MoE)方法。
    • (3)节省内存的设计方法,如:CPU卸载、激活重新计算、混合精度训练、压缩以及内存高效优化器等等。

为什么要 多GPU

两种原因:

  • 第一种:模型在一块GPU上放不下,多块GPU上就能运行完整的模型(如早期的AlexNet)。
  • 第二种:多块GPU并行计算可达到加速训练的效果。

语言模型发展

设计分布式训练系统的一个最重要的原因

  • 单个计算设备的算力已经不足以支撑模型训练。

机器学习模型快速发展

  • 从2013年AlexNet开始,到2022年拥有5400亿参数的PalM模型,机器学习模型以每18个月增长56倍的速度发展。
  • 模型参数规模增大的同时,对训练数据量的要求也指数级增长,这更加剧了对算力的需求。

近几年CPU算力增加已经远低于 摩尔定律(Moore’s Law)

  • 虽然计算加速设备(如GPU、TPU等)为机器学习模型提供了大量的算力,但是其增长速度仍然没有突破每18个月翻倍的摩尔定律

为了能够满足机器学习模型发展,只有通过分布式训练系统才可以匹配模型不断增长的算力需求。

大语言模型参数量和数据量非常巨大,因此都采用了分布式训练架构完成训练。

  • OPT模型训练用了992块NVIDIA A100 80G GPU,采用全分片数据并行(Fully Sharded Data Parallel)以及Megatron-LM 张量并行(Tensor Parallelism),整体训练时间将近2个月。
  • BLOOM模型在硬件和所采用的系统架构方面的细节。训练一共花费3.5个月,使用48个计算节点。
    • 每个节点包含8块NVIDIA A100 80G GPU(总计384个GPU)
    • 并且使用 4*NVLink 用于节点内部GPU之间通信。节点之间采用四个 Omni-Path 100 Gbps网卡构建的增强8维超立方体全局拓扑网络进行通信。
  • LLaMA模型训练采用 NVIDIA A100 80GB GPU
    • LLaMA-7B 模型训练需要 82432 GPU小时
    • LLaMA-13B 模型训练需要 135168 GPU小时
    • LLaMA-33B 模型训练花费了 530432 GPU小时
    • LLaMA-65B 模型训练花费则高达 1022362 GPU小时。
模型 GPU型号 GPU数目 训练时间  
OPT A100 992 2个月 FSDP+TP
BLOOM A100 384 3.5个月  
LLaMA A100      

性能提速

在 pytorch1.7 + cuda10 + TeslaV100的环境下,使用ResNet34,batch_size=16, SGD对花草数据集训练的情况如下:

  • 1块 GPU需要9s一个epoch
  • 2块 GPU是5.5s
  • 8块 是2s。

问题

  • 为什么运行时间不是 9/8≈1.1s ?
  • 因为使用GPU数量越多,设备之间的通讯会越来越复杂,所以随着GPU数量的增加,训练速度的提升也是递减的。

误差梯度如何在不同设备之间通信?

  • 在每个GPU训练step结束后,将每块GPU的损失梯度平均,而不是每块GPU各计算各的。

BN如何在不同设备之间同步?

  • 假设 batch_size=2,每个GPU计算的均值和方差都针对这两个样本而言的。
  • 而BN的特性是:batch_size 越大,均值和方差越接近与整个数据集的均值和方差,效果越好。
  • 使用多块GPU时,会计算每个BN层在所有设备上输入的均值方差。如果GPU1和GPU2都分别得到两个特征层,那么两块GPU一共计算4个特征层的均值和方差,可以认为batch_size=4。
  • 注意:如果不用同步BN,而是每个设备计算自己的批次数据的均值方差,效果与单GPU一致,仅仅能提升训练速度;
  • 如果使用同步BN,效果会有一定提升,但是会损失一部分并行速度。

单GPU、是否使用同步BN训练的三种情况,可以看到

  • 使用同步BN(橙线)比不使用同步BN(蓝线)总体效果要好一些,不过训练时间也会更长。
  • 使用单GPU(黑线)和不使用同步BN的效果是差不多的。

两种GPU训练方法:DataParallelDistributedDataParallel

  • DataParallel是单进程多线程的,仅仅能工作在单机中。而DistributedDataParallel是多进程的,可以工作在单机或多机器中。
  • DataParallel通常会慢于DistributedDataParallel。所以目前主流的方法是DistributedDataParallel。
维度 DP DDP
运行环境 单机,单进程多线程 单/多机,多进程
速度
     

分布式模式

深度学习任务通用 GPU 进行模型训练。

  • 因为 GPU 相对于 CPU 具有更多的算术逻辑单元ALU),发挥并行计算的优势,特别适合计算密集型任务,更高效地完成深度学习模型的训练。
  • 更多 GPU 知识见站内专题 并行计算GPU

分析

  • 虽然 GPU 并行计算能力优异,但无法单独工作,必须由 CPU 进行控制调用;
  • 而且显存内存之间的频繁数据拷贝,可能带来较大的性能开销。
  • CPU 虽然计算能力不如 GPU,但可以独立工作,直接访问内存数据完成计算。

因此,想获得更好的训练性能,需要合理利用 GPU 和 CPU 的优势。

分布式目标

分布式训练总体目标: 提升总训练速度,减少模型训练的总体时间。

总训练速度公式:

  • 总训练速度 ∝ 单设备计算速度 X 计算设备总量 X 多设备加速比
  • 单设备计算速度主要由单块计算加速芯片的运算速度数据I/O能力来决定
    • 对单设备训练效率进行优化,主要技术手段: 混合精度训练算子融合梯度累加等;
  • 分布式训练系统中计算设备数量越多,其理论峰值计算速度就会越高,但是受到通信效率的影响,计算设备数量增大则会造成加速比急速降低;
  • 多设备加速比则由计算和通讯效率决定,结合算法和网络拓扑结构进行优化,分布式训练并行策略主要目标就是提升分布式训练系统中的多设备加速比。

CPU + GPU 工作模式

GPU 模式下的模型训练如图所示,分为4步:

  • 第1步,将输入数据从系统内存拷贝到显存。
  • 第2步,CPU 指示 GPU 处理数据。
  • 第3步,GPU 并行地完成一系列的计算。
  • 第4步,将计算结果从显存拷贝到内存。

多机协作

【2024-4-11】多机多卡协作

常见问题

模型训练的常见问题

  • 问题一:GPU 显存爆满,资源不足
    • V100 为例,其显存最高也仅有 32G,甚至有些显存仅 12G 左右。因此当模型的参数量较大时,在 GPU 模式下模型可能无法训练起来。
    • 设置 CPU 模式进行模型训练,可以避免显存不足的问题,但是训练速度往往太慢。
    • 如何在单机训练中充分地利用 GPU 和 CPU 资源,让部分层在 CPU 执行,部分层在 GPU 执行呢?
  • 问题二:频繁数据拷贝,训练效率低

分布式训练

资料

【2024-8-23】Github 分布式训练总结 tech_slides, pdf

【2024-5-27】 MIT 助理教授 Song Han 的 分布式训练介绍 ppt:

part1

part2

通信技术

分布式条件下的多进程、多worker之间的通信技术,常见的主要有:MPI、NCCL,GRPC等。

  • MPI主要是被应用在超算等大规模计算领域,机器学习场景下使用较少。主要是openMPI原语等。
  • NCCL是NVIDIA针对GPU设计的一种规约库,可以实现多GPU间的直接数据同步,避免内存和显存的,CPU和GPU间的数据拷贝成本。当在TensorFlow中选择单机多卡训练时,其默认采用的就是NCCL方式来通信。
  • GRPC是比较成熟的通信技术了,spark等框架内也都有用到。

演变

  • 早期MPI在CPU和GPU的分布式通信领域都是主力军
  • 在NCCL推出之后
    • MPI库现在就只用在了CPU分布式通信场景
    • 而GPU分布式通信库目前都是以NCCL为主(NV场景)。

通信方式

Pytorch 分布式训练通信依赖torch.distributed模块,torch.distributed提供了point-2-point communicationcollective communication两种通信方式。

  • 点对点 point-2-point communication(P2P)提供了send和recv语义,用于任务间的通信。
  • 收集 collective communication(CC)提供了 scatter/broadcast/gather/reduce/all_reduce/all_gather 语义,不同的backend在提供的通信语义上具有一定的差异性。

训练大模型主要是CC通信

GPU通信技术

【2024-6-17】GPU通信技术:GPU Direct、NVLink与RDMA

GPU通信技术是加速计算的关键,其中GPU DirectNVLinkRDMA是三种主流技术。

RDMA(Remote Direct Memory Access)是一种远程直接内存访问技术,允许一个设备直接访问另一个设备上的内存数据。在GPU通信中,RDMA技术用于加速GPU与CPU、GPU与GPU以及GPU与网络之间的数据传输。

DMA 是“直接内存读取”的意思,用来传输数据,它也属于外设。只是在传输数据时,无需占用CPU。

  • 高速IO设备可以在处理器安排下直接与主存储器成批交换数据,称为直接存储器访问(Directly Memory Access 简称DMA)

比如GPU与CPU之间存在着大量的数据传输.

  • CPU将需要显示的原始数据放在内存中,让GPU通过DMA的方式读取数据,经过解析和运算,将结果写至显存中,再由显示控制器读取显存中的数据并显示输出.

GPU与CPU集成至同一个处理器芯片时,能够大大减少芯片间的数据搬运,同时因为显存和内存的合并,会大大增加访存压力

DMA传输方向有三个:外设到内存内存到外设内存到内存

  • 外设内存。即从外设读取数据到内存。例如ADC采集数据到内存,ADC寄存器地址为源地址,内存地址为目标地址。
  • 内存外设。即从内存读取数据到外设。例如串口向电脑发送数据,内存地址为源地址,串口数据寄存器地址为目标地址。此时内存存储了需要发送的变量数据。
  • 内存内存。以内部flash向内部sram传输数据为例,此时内部flash地址即为源地址,内部sram地址即为目标地址。同时,需要将DMA_CCRx寄存器的MEM2MEM置位。
一、GPU Direct

GPU Direct 是一种优化GPU之间或GPU与第三方设备之间数据传输的技术。它通过共享内存访问点对点通信减少了数据复制和传输延迟。

(1) GPU Direct Shared Memory

2010年,NVIDIA推出了GPU Direct Shared Memory技术,允许GPU与第三方PCI Express设备通过共享的host memory实现共享内存访问。这使得内存空间得以共享,减少了数据复制,降低了数据交换延迟。

(2) GPU Direct P2P (Peer-to-Peer)

2011年,GPU Direct增加了Peer-to-Peer(P2P)技术,支持同一PCI Express总线上的GPU之间的直接访问和传输。这种技术绕过了CPU,使得GPU之间通信更加高效。

(3) GPU Direct RDMA

2013年,GPU Direct增加了RDMA(Remote Direct Memory Access)支持。

RDMA允许第三方PCI Express设备绕过CPU host memory,直接访问GPU内存。这种技术大幅提升了数据传输效率,尤其适用于高性能计算和数据中心等场景。

NVLink是一种专门设计用于连接NVIDIA GPU的高速互联技术。它通过点对点通信方式,绕过传统的PCIe总线,提供了更高的带宽和更低的延迟。

带宽与延迟 NVLink采用串行协议,支持双向数据传输,每个方向都有高达32GB/s的带宽。这使得两个GPU之间能够实现高速数据传输和共享,为多GPU系统提供了更高的性能和效率。与传统的PCIe总线相比,NVLink显著降低了通信延迟。

连接与扩展 NVLink可用于连接两个或多个GPU,以实现多GPU协同工作。这种连接方式简化了系统架构,提高了可扩展性。通过NVLink连接的GPU可以共享数据和计算资源,从而在某些应用中实现性能倍增。

三、RDMA

RDMA(Remote Direct Memory Access)是一种远程直接内存访问技术,允许一个设备直接访问另一个设备上的内存数据。在GPU通信中,RDMA技术用于加速GPU与CPU、GPU与GPU以及GPU与网络之间的数据传输。

DMA原理 在介绍RDMA之前,我们需要理解DMA(Direct Memory Access)原理。DMA是一种技术,允许硬件控制器直接从内存读取或写入数据,而不需要经过CPU。这大大减轻了CPU的负担,提高了数据传输效率。RDMA基于此原理,进一步扩展了其应用范围。

RDMA的优势 RDMA提供了高带宽和低延迟的数据传输能力。它利用网卡等设备的远程直接内存访问功能,允许设备之间快速高效地传输大量数据。在高性能计算、数据中心和云计算等领域,RDMA成为提高系统性能的关键技术之一。

GPU与RDMA的结合 通过将RDMA与GPU相结合,可以实现高性能的GPU通信。在这种配置中,GPU可以借助RDMA直接访问其他设备或网络的内存数据,从而避免了不必要的CPU中介和数据拷贝。这不仅提高了数据传输速率,还降低了CPU负载和功耗。

总结: GPU通信技术在加速计算领域发挥着越来越重要的作用。GPU Direct、NVLink和RDMA是三种主流的GPU通信技术,它们分别通过共享内存访问、高速互联和远程直接内存访问等方式提高了GPU之间的通信效率。在实际应用中,根据不同的场景和需求选择合适的通信技术至关重要。随着技术的不断发展,未来我们有望看到更多创新性的GPU通信解决方案,为高性能计算和数据中心等领域带来更大的性能提升。

如何选择

PyTorch 支持

torch.distributed 支持 3 种后端,分别为 NCCLGlooMPI

如何选择?

  • NCCL 目前最快,且对多进程分布式(Multi-Process Single-GPU)支持极好,可用于单节点以及多节点的分布式训练。
  • 节点即主机。即使是单节点,由于底层机制不同, distributed 也比 DataParallel 方式要高效。

基本原则:

  • 用 NCCL 进行分布式 GPU 训练
  • 用 Gloo 进行分布式 CPU 训练

无限带宽互联的 GPU 集群

  • 使用 NCCL,因为它是目前唯一支持 InfiniBand 和 GPUDirect 的后端

无限带宽和 GPU 直连

  • 使用 NCCL,因为其目前提供最佳的分布式 GPU 训练性能。尤其是 multiprocess single-node 或 multi-node distributed 训练。
  • 如果用 NCCL 训练有问题,再考虑使用 Cloo。(当前,Gloo 在 GPU 分布式上,相较于 NCCL 慢)

无限带宽互联的 CPU 集群

  • 如果 InfiniBand 对 IB 启用 IP,请使用 Gloo,否则使使用 MPI。
  • 在未来将添加 infiniBand 对 Gloo 的支持

以太网互联的 CPU 集群

  • 使用 Gloo,除非有特别的原因使用 MPI。

MPI 后端

MPI 即消息传递接口(Message Passing Interface),来自于高性能计算领域的标准的工具。

  • 支持点对点通信以及集体通信,并且是 torch.distributed 的 API 的灵感来源。
  • 使用 MPI 后端的优势: 在大型计算机集群上,MPI 应用广泛,且高度优化。

但是,torch.distributed 对 MPI 并不提供原生支持。

因此,要使用 MPI,必须从源码编译 Pytorch。是否支持 GPU,视安装的 MPI 版本而定。

Gloo 后端

gloo 后端支持 CPU 和 GPU,其支持集体通信(collective Communication),并对其进行了优化。

由于 GPU 之间可以直接进行数据交换,而无需经过 CPU 和内存,因此,在 GPU 上使用 gloo 后端速度更快。

torch.distributed 对 gloo 提供原生支持,无需进行额外操作。

NCCL 通信原语

【2023-7-27】大模型-LLM分布式训练框架总结

NCCL 的全称为 Nvidia 聚合通信库(NVIDIA Collective Communications Library),是一个可以实现多个 GPU、多个结点间聚合通信的库,在 PCIe、Nvlink、InfiniBand 上可以实现较高的通信速度。

NCCL 高度优化和兼容了 MPI,并且可以感知 GPU 的拓扑,促进多 GPU 多节点的加速,最大化 GPU 内的带宽利用率,所以深度学习框架的研究员可以利用 NCCL 的这个优势,在多个结点内或者跨界点间可以充分利用所有可利用的 GPU。

NCCL 对 CPU 和 GPU 均有较好支持,且 torch.distributed 对其也提供了原生支持。

对于每台主机均使用多进程的情况,使用 NCCL 可以获得最大化的性能。每个进程内,不许对其使用的 GPUs 具有独占权。若进程之间共享 GPUs 资源,则可能导致 deadlocks。

NCCL 英伟达集合通信库专用于多个 GPU 乃至多个节点间通信。

  • 专为英伟达的计算卡和网络优化,能带来更低的延迟和更高的带宽。

原语

  • Broadcast: 一对多的通信原语,一个数据发送者,多个数据接收者,可以在集群内把一个节点自身的数据广播到其他节点上。
  • Scatter: 一对多的通信原语,也是一个数据发送者,多个数据接收者,可以在集群内把一个节点自身的数据发散到其他节点上。与Broadcast不同的是,Broadcast把主节点0的数据发送给所有节点,而Scatter则是将数据进行切片再分发给集群内所有的节点。
  • Gather: 多对一的通信原语,具有多个数据发送者,一个数据接收者,可以在集群内把多个节点的数据收集到一个节点上。
  • AllGather: 多对多的通信原语,具有多个数据发送者,多个数据接收者,可以在集群内把多个节点的数据收集到一个主节点上(Gather),再把这个收集到的数据分发到其他节点上(broadcast),即收集集群内所有的数据到所有的节点上。
  • Reduce: 多对一的通信原语,具有多个数据发送者,一个数据接收者,可以在集群内把多个节点的数据规约运算到一个主节点上,常用的规约操作符有:求累加和SUM、求累乘积PROD、求最大值MAX、求最小值MIN、逻辑与LAND、按位与BAND、逻辑或LOR、按位或BOR、逻辑异或LXOR、按位异或BOXR、求最大值和最小大的位置MAXLOC、求最小值和最小值的位置MINLOC等,这些规约运算也需要加速卡支持对应的算子才能生效。
  • ReduceScatter: 多对多的通信原语,具有多个数据发送者,多个数据接收者,在集群内的所有节点上都按维度执行相同的Reduce规约运算,再将结果发散到集群内所有的节点上。Reduce-scatter等价于节点个数次的reduce规约运算操作,再后面执行节点个数的scatter次操作。其反向操作是AllGather。
  • AllReduce: 多对多的通信原语,具有多个数据发送者,多个数据接收者,在集群内的所有节点上都执行相同的Reduce操作,可以将集群内所有节点的数据规约运算得到的结果发送到所有的节点上。

通信原语汇总

汇总如下

原语操作 模式 说明 图解 示意图
Broadcast 广播:一对多 广播行为:从节点0广播相同信息到其它节点(0-3)
Scatter 一对多 另一种广播,从节点0将数据不同部分按需发送到不同节点,常见于DP的数据分配起步阶段
Reduce 规约:多对一 规约操作,Reduce意为减少/精简,一系列简单聚合运算,如:sum/min/max,prod,lor等
AllReduce 多对多 所有节点上应用相同的Reduce操作,单节点上 Reduce + Broadcast,最消耗带宽
Gather 多对一 反向Scatter:将多个Sender的数据汇总到单个节点上
AllGather 多对多 收集所有节点到所有节点上, AllGather=Gather+Broadcast
ReduceScatter   将单节点输入求和,再0维度按卡切分并发送, ReduceScatter=Reduce+Scatter
All2All   全交换操作,每个节点都获取其他节点的值

All2All 与 All Gather 区别在于:LLM分布式训练第一课(通讯原语)

  • All Gather 操作中,不同节点向某一节点收集到的数据是完全相同的
  • 而在 All2All 中,不同的节点向某一节点收集到的数据是不同的。

AllReduce 的目标: 将不同机器上的数据整合(reduce)后分发给各个机器

AllReduce 实现方法

  • 最简单: 每个worker将自己的数据广播给所有worker —— 问题: 大量浪费
  • 改进: 主从架构, 指定一个worker作为master,负责整合运算,以及分发 —— 问题: master成为网络瓶颈
  • 改进: Ring AllReduce

Ring AllReduce:

  • 第一阶段,将N个worker分布在一个环上,并且把每个worker的数据分成N份。
  • 第二阶段,第k个worker把第k份数据发给下一个worker,同时从前一个worker收到第k-1份数据。
  • 第三阶段,worker把收到的第k-1份数据和自己的第k-1份数据整合,再将整合的数据发送给下一个worker
  • 此循环N次之后,每一个worker都会包含最终整合结果的一份。

假设每个worker的数据是一个长度为S的向量,那么Ring AllReduce里每个worker发送的数据量是O(S),和worker的数量N无关。避免了主从架构中master需要处理O(S*N)数据量而成为网络瓶颈的问题。

Ring All-reduce

  • Pytorch 实现: DistributedDataParallel
  • Ring All-reduce=reduce-scatter+all-gather

NCCL 通信行为分析

【2024-5-10】集合通信行为分析 - 基于NCCL

deepspeed 启动多卡训练时,日志里会打印NCCL通信信息,这些日志都是什么意思?

NCCL 通信阶段

  • Phase 1 - 启动阶段 Bootstrap Phase: 初始化集合中的所有节点(node)和卡(rank),确保所有卡知道彼此
    • Initiate all nodes and then all ranks in a collective. It makes sure all ranks know about all other ranks, so any rank is able to communicate with any other rank.
  • Phase 2 - 拓扑阶段 Topology Phase: 每隔节点了解机器上各个硬件(CPU/GPU/NIC)映射关系, 创建内部拓扑结构(树/环),通过PCI和NVLink通信
    • Each node detects and maps out what hardware is located on the machine.
    • Hardware includes CPUs, GPUs, NICs and interconnect types.
    • Each node then creates an intra-machine graph, connects hardware with PCIe or NVLink interconnect, and evaluates the graph.
    • When the intra-machine topology is decided, the system will decide what pattern to use for the whole system.
    • The two main patterns are a tree or a ring.
    • While the topology is evaluated, NCCL is also tuning it by performing tests. This allows each rank to pre-compute thresholds for message sizes.
  • Phase 3 - 聚合阶段 Collective Phase: 用户调用NCCL支持的集合通信原语进行通信
    • A user can dispatch many collective operations using the same topology.

用户调用NCCL支持的集合通信原语进行通信:

  • 集合通信原语
    • AllReduce
    • Broadcast
    • Reduce
    • AllGather
    • ReduceScatter
  • 点对点通信原语
    • Send
    • Recv

NCCL在getAlgoInfo里面使用ncclTopoGetAlgoTime来遍历计算(algorithm, protocol),最终选择预测会最快做完指定数据量的指定集合通信原语的algorithm和protocol完成该通信原语。

示例

  • 以2机16卡, NCCL 2.8.4为例
  • NCCL会构建tree,ring graph。

(1) tree

解析

拓扑log格式

# IP: hostname:pid:tid [cudaDev] NCCL INFO Trees [channel ID] down0 rank/down1 rank/down2 rank->current rank->up rank
10.0.2.11: 2be7fa6883db:57976:58906 [5] NCCL INFO Trees [0] 14/-1/-1->13->12 [1] 14/-1/-1->13->12
# 10.0.2.11上的设备5,其rank为13,有两棵树,分别为channel 0和channel 1: channel 0的子节点只有14, 父节点为12; channel 1一样。

channel log格式

# IP: hostname:pid:tid [cudaDev] NCCL INFO Channel [channel ID] current rank[bus ID]->successor rank[bus ID] via transport type
10.0.2.11: 2be7fa6883db:57976:58906 [5] NCCL INFO Channel 00 : 13[3e000] -> 14[40000] via P2P/IPC
# 10.0.2.11上的设备5(rank 为13, bus ID为3e000),其channel 0连接至rank 14,传输方式为P2P/IPC。

结果

依此解析,可得两棵一样的tree,逻辑拓扑如下:img

(2) ring

Ring Logical Topology

拓扑log格式

# IP: hostname:pid:tid [cudaDev] NCCL INFO Channel ring_ID/ring_number: rank0 rank1 … last_rank
10.0.2.12: 94f182076445:82261:83141 [0] NCCL INFO Channel 00/02 : 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# 建成了02个ring,其中第0个ring的成员有:0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15,该ring共由16个rank组成。

channel log格式

  • 与tree拓扑的格式一致。

可得两个一样的ring,逻辑拓扑如下:img

梯度压缩

分布式训练的 bandwidth 与 latency bottleneck 主要分布在 梯度 all-reducescatter 过程中

  • 其中 联邦学习 受限的地方还有与端侧设备的通讯

解法:梯度压缩

  • 方法1: prune, 【Deep gradient compression】
    • worker 向 server push 梯度时, 可以对梯度做 prune (sparse gradient) 与 quantization
  • 方法2: Low-Rank 【PowerSGD】,梯度映射到低秩空间,而不是去做细粒度的剪枝和量化
    • 2019年 EPFL 的文章 PowerSGD, 发了 NIPS
  • 方法3: 量化, 1bit SGD
    • 用 one bit 的矩阵作为需要通讯的梯度
  • 方法5: terngrad, ternery
    • 梯度量化到 0, -1, 1

梯度延迟更新:解决 latency 的 bottleneck

详见: 分布式训练优化–进阶篇

并行技术

并行技术:

  • 数据并行 dp(如:PyTorch DDP): 每个节点复制完整的模型,数据分片
    • 内存开销大,通信量低,容易实施
    • ZeRO 对 data-parallel 的优化,每个gpu有自己独特的数据,同时模型的参数也被均匀的分到 n个gpu上
  • 模型并行 mp : 完整模型只有一份,其它节点只有模型的局部
    • 张量并行 tp: 模型按张量分发
      • 内存开销小,通信量高,容易实施
      • 如:Megatron-LM(1D)、Colossal-AI(2D、2.5D、3D)
    • 流水线并行 pp: 模型按层分发
      • 内存开销小,通信量中等,实施难度大
      • 如:GPipe、PipeDream、PipeDream-2BW、PipeDream Flush(1F1B)
  • 多维混合并行(如:3D并行(数据并行、模型并行、流水线并行))
    • 2D 并行: dp+pp, tp+pp
    • 3D 并行: dp+tp+pp
  • 自动并行: 自动搜索并行空间
    • 如:Alpa(自动算子内/算子间并行)), 将并行空间分为 inter-op (pipeline) 与 intra-op (tensor并行),使用NAS搜索这两个空间,考虑整个搜索空间的cost。
  • 优化器相关并行(如:ZeRO(零冗余优化器,在执行的逻辑上是数据并行,但可以达到模型并行的显存优化效果)、PyTorch FSDP)

【2023-12-15】MIT 端侧模型训练课程: TinyML and Efficient Deep Learning Computing, 含 ppt 和 视频

  • powerful deep learning applications on resource-constrained devices.
  • Topics include model compression, pruning, quantization, neural architecture search, distributed training, data/model parallelism, gradient compression, and on-device fine-tuning. application-specific acceleration techniques

模型切分分3个互相正交的维度:[data, model-layer, model-activation(Tensor)]

  • 这3个维度互不影响,可同时实现,即 3D parallelism
并行维度 切分方式 Split 模型完整性 通讯 对gpu的利用 优化手段
data data Copy of whole model 非常少 (只有前向和反向的时候需要通讯) High (for 训练加速) ZeRO
pipeline model-layer Part of model 中等 Low (for 显存占用太多,优化显存)  
tensor model-tensor Part of model 很多(每个需要reduce的中间结果都要通讯) High (模型算到底再通讯)  

详见: 分布式训练优化–进阶篇

常见多GPU训练方法:

  1. 模型并行:如果模型特别大,GPU显存不够,无法将一个显存放在GPU上,需要把网络的不同模块放在不同GPU上,这样可以训练比较大的网络。(下图左半部分)
  2. 数据并行:将整个模型放在一块GPU里,再复制到每一块GPU上,同时进行正向传播反向误差传播。相当于加大了batch_size。(下图右半部分)

大规模深度学习模型训练中有几个主要范式:

  • 数据并行(DP):模型尺寸能够被单个GPU 内存容纳,模型的不同实例在不同的 GPU 和不同批数据上运行,模型的每个实例都使用相同的参数进行初始化,但在前向传递期间,不同批次的数据被发送到每个模型。 收集来自每个模型实例的梯度并计算梯度更新。,然后更新模型参数并将其作为更新发送到每个模型实例。
    • 数据并行通过在 N 台机器上复制模型来实现。拆分 minibatch ,分成 N 个块,让每台机器处理一个块。
  • 模型并行:当单个 GPU无法容纳模型尺寸时,模型并行性变得必要,有必要将模型拆分到多个 GPU 上进行训练。实现模型尺寸超过单个GPU显存的深度学习模型训练。
    • 这种方法的问题是计算使用效率不高,因为在任何时间点只有一个 GPU 正在使用,而其他 GPU 处于空闲状态。
    • 相对于流水线并行和数据并行,模型并行具有以下优点:
      • 支持更大的模型规模:流水线并行和数据并行的限制通常是 GPU 内存大小和 GPU 数量,而模型并行可以支持更大的模型规模,因为模型可以分割成多个子模型,并分配到多个 GPU 上运行。
      • 减少通信开销:流水线并行的模型划分通常会导致模型层之间的通信,而模型并行只需在每个子模型之间进行通信。相对于数据并行,模型并行在执行过程中通信量更少,因为每个 GPU 只需传递模型的一部分而不是全部。
      • 灵活的模型分配:模型并行可以更灵活地将模型分配给不同的 GPU 或计算节点,这意味着可以在不同的 GPU 上运行不同的模型子集,从而实现更好的负载平衡和性能优化。
  • 流水线并行 (PP)
    • 朴素流水线并行(Naive Pipeline Parallelism)是将一组模型层分布在多个 GPU 上,并简单地将数据从 GPU 移动到 GPU,就好像它是一个大型复合 GPU 一样。
    • 流水线并行 (PP) 与上述朴素流水线并行几乎相同,但它解决了 GPU 闲置问题,方法是将传入的 batch 为 micro-batches 并人工创建流水线,从而允许不同的 GPU 同时参与计算过程。
    • 流水并行是将一个大型计算任务拆分成多个小的子任务,并将子任务在多个处理单元上同时执行。不同于数据并行和模型并行,流水并行不是将数据或模型分割成多个部分并在处理单元间并行处理,而是将一系列计算步骤分解成多个流水阶段,并在多个处理单元上同时执行,以减少总体计算时间。

通俗理解

  • Data Parallelism:模型1台设备装得下,所以同模型用多份数据分开训练
  • Pipeline Parallelism:模型装不下,模型1层或多层1台设备装得下,所以同模型按层拆开训练
  • Tensor Parallelism:模型1层都装不下,所以层内拆开训练

数据并行

数据并行性(Data parallelism (DP))最简单的方法是:将相同的模型权重复制到多个worker中,并将一部分数据分配给每个worker以同时进行处理。

  • 如果模型规模大于单个GPU的内存,Naive DP无法正常工作时。GeePS(Cui 等人,2016 年)之类的方法将暂时未使用的参数卸载回 CPU,以使用有限的 GPU 内存。数据交换传输在后端进行,且不干扰训练计算。

在每个小批量结束时,workers需要同步梯度或权重,以替换旧参数。常见有两种主要的同步方法,它们都有明确的优缺点:

  • 1)大容量同步并行( Bulk synchronous parallels (BSP)):workers在每个小批量结束时同步数据。这种方法可以防止模型权重过时,同时获得良好的学习效率,但每台机器都必须停止并等待其他机器发送梯度。
  • 2)异步并行(Asynchronous parallel (ASP)):每个GPU工作进程异步处理数据,无需等待或暂停。然而,这种方法很容易导致网络使用陈旧的权重参数,从而降低统计学习效率。即使它增加了计算时间,也可能不会加快收敛的训练时间。

中间的某个地方是在每次x迭代时,全局同步梯度(x>1)。自Pytorch v1.5版(Li等人,2021年)以来,该特征在平行分布数据(DDP)中被称为“梯度累积”。Bucket 梯度计算方法避免了梯度的立即AllReduce,而是将多个梯度变化值存储到一个AllReduce中以提高吞吐量,可以基于计算图进行计算和通信调度优化。

模型并行(大模型)

【2023-8-28】模型并行最佳实践(PyTorch)

DataParallel的优缺点如下:

  • 优点:将模型复制到所有GPU,其中每个GPU消耗输入数据的不同分区,可以极大地加快训练过程。
  • 缺点:不适用于某些模型太大而无法容纳单个GPU的用例。

模型并行性(Model parallelism: MP)目的是解决模型权重不能适应单个节点的情况,通过将计算和模型参数分布在多台机器上进行训练。

  • 数据并行中,每个worker承载整个模型的完整副本
  • 而模型并行中,每个worker上只分配模型参数的一小部分,从而减少了内存使用和计算。

原理

  • 将单个模型拆分到不同GPU上,而不是在每个GPU上复制整个模型
  • 将模型不同子网放置到不同设备上,并相应地实现该 forward方法以在设备之间移动中间输出。由于模型的一部分只能在任何单个设备上运行,因此一组设备可以共同为更大的模型服务。

模型 m 包含10层:

  • DataParallel: 每个GPU都具有这10层中每层副本
  • 而在两个GPU上使用模型并行时,每个GPU可以托管5层

由于深度神经网络通常包含一堆垂直层,因此将一个大型模型逐层拆分感觉很简单,其中一组连续的小层被分组到一个工作层上的一个分区中。然而,通过多个具有顺序依赖性的工作线程来运行每个数据批,会导致大量的等待时间和计算资源利用率低下的问题。

模型并行有两种:张量并行 和 流水线并行

  • 张量并行是在一个操作中进行并行计算,如:矩阵-矩阵乘法。
  • 流水线并行是在各层之间进行并行计算。

总结

  • 张量并行是层内并行,流水线并行是层间并行。

流水线并行(综合模型+数据)

通道并行(Pipeline parallelism: PP)将模型并行数据并行相结合,以减少部分训练过程中出现的空闲时间。

主要思想

  • 将一个小批量拆分为多个微批次,并使worker在每个阶段中能够同时处理一个微批次。需要注意的是,每个微批次需要两次传递,一次向前,一次向后。worker之间的通信仅传输激活(向前)和梯度(向后)。这些通道的调度方式以及梯度的聚合方式在不同的方法中有所不同。分区(workers)的数量也称为通道深度。

模型按层分割成若干块,每块都交给一个设备。

  • 前向传播: 每个设备将中间激活传递给下一个阶段。
  • 后向传播: 每个设备将输入张量梯度传回给前一个流水线阶段。

这允许设备同时进行计算,从而增加训练的吞吐量。

  • img

缺点

  • 训练设备容易出现空闲状态(因为后一阶段等待前一阶段执行完毕),导致计算资源的浪费,加速效率没有数据并行高。
  • img

典型的流水线并行实现:

  • GPipe、PipeDream、PipeDream-2BW、PipeDream Flush(1F1B)。

张量并行(水平分割)

模型并行和流水线并行都将一个模型垂直分割,可以将一个张量操作的计算水平分割到多个设备上,称为张量并行(tensor parallelism,TP)。

  • 张量并行将张量沿特定维度分成 N 块,每个设备只持有整个张量的 1/N,同时不影响计算图的正确性。
  • 这需要额外的通信来确保结果的正确性。

以当下比较流行的transformer为例,transformer模型主要由多层MLP和自我注意块组成。Megatron-LM(Shoeybi et al.2020)等人采用了一种简单的方法来并行多层计算MLP和自我注意。变压器中的MLP层包含GEMM(通用矩阵乘法)和非线性GeLU传输,按列拆分权重矩阵A

典型的张量并行实现:

  • Megatron-LM(1D)
  • Colossal-AI(2D、2.5D、3D)

多维混合并行

多维混合并行指将数据并行模型并行流水线并行结合起来进行分布式训练。

超大规模模型的预训练和全参数微调时,都需要用到多维混合并行。

2D 并行

主要有

  • Data 并行+ pipeline 并行
    • Deepspeed web-link给出了 pipeline 和 data-parallel 的2D并行示意图,其中 rank0 和 rank1 为 data-parallelism, rank0里的 gpu-0 和 gpu-2 进行 pipeline 并行,他们交替进行前向和反向过程,疑问的是(这里没有模型运行的最终的loss,如何进行反向传播呢?)
  • Tensor 并行 + pipeline

3D 并行

3D并行 => Tensor + pipeline + data

异构系统并行

与 GPU 相比,CPU 内存要大得多。

  • 典型服务器上,CPU 可以轻松拥有几百GB甚至上TB的内存,而每张 GPU 卡通常只有 48 或 80 GB的内存。

为什么 CPU 内存没有被用于分布式训练?

  • 依靠 CPU 甚至是 NVMe 磁盘来训练大型模型。
  • 主要想法: 在不使用张量时,将其卸载回 CPU 内存或 NVMe 磁盘。

通过使用异构系统架构,有可能在一台机器上容纳一个巨大的模型。

自动搜索并行空间

alpa

Alpa 将并行空间分为 inter-op (pipeline) 与 intra-op (tensor并行),使用 NAS搜索这两个空间,考虑整个搜索空间的cost。

  • 首先搜索 inter-op 的搜索空间, 制定 pipeline 并行策略
  • 然后搜索 intra-op空间, 指定 data-para 与 operator-para 策略(包括两种)
  • Data para
  • Operator parallel (weight 广播,input拆分)
  • Operator parallel (weight 拆分,input拆分) –> 需要增加 all-reduce cost

UCB博士 郑怜悯 的工作, 他还参加过其他项目 Ansor,TVM, vLLM, FastChat,LMSYS-Chat-1M

模型训练开销

神经网络模型占用的显存包括:

  • 模型自身的参数
  • 模型的输出

全连接网络(不考虑偏置项b): Y = XW + b

  • X 是 B*M 维
  • W 是 MN 或 NM 维
  • Y 是 B*N 维

显存占用包括:

  • 参数:二维数组 W
  • 模型的输出: 二维数组 Y
  • X是上一层的输出,因此显存占用归于上一层。

显存占用就是W和Y两个数组?非也

模型训练流程

参数的显存占用

【2023-8-30】大模型要占你多少内存?这个神器一键测量,误差低至0.5MB,免费可用

大模型训练推理要用多少内存?

  • HuggingFace Space上的最新火起来工具——Model Memory Calculator,模型内存测量器,在网页端人人可体验。
  • 比如模型bert-base-case Int8估计占用413.18 MB内存,实际占用为413.68MB,相差0.5MB,误差仅有0.1%。

实际推理过程,EleutherAI 发现需要在预测数据基础上,预留20%的内存

【2023-8-30】baichuan-7b (14G) 部署失败,空间不够

  • GPU: A30, 24G 显存

错误信息:

torch.cuda.OutOfMemoryError: CUDA out of memory. Tried to allocate 86.00 MiB (GPU 0; 22.20 GiB total capacity; 7.47 GiB already allocated; 51.12 MiB free; 7.48 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

Model Memory Calculator计算的开销

Memory Usage for ‘baichuan-inc/Baichuan-7B’

dtype Largest Layer or Residual Group Total Size Training using Adam
float32 1000.0 MB 26.2 GB 104.82 GB
float16/bfloat16 500.0 MB 13.1 GB 52.41 GB
int8 250.0 MB 6.55 GB 26.2 GB
int4 125.0 MB 3.28 GB  

只有有参数的层,才会有显存占用。这部份的显存占用和输入无关,模型加载完成之后就会占用。

有参数的层主要包括:

  • 卷积
  • 全连接
  • BatchNorm
  • Embedding层
  • … …

无参数的层:

  • 多数的激活层(Sigmoid/ReLU)
  • 池化层
  • Dropout
  • … …

模型参数数目(不考虑偏置项b)为:

  • Linear(M->N): 参数数目:M×N
  • Conv2d(Cin, Cout, K): 参数数目:Cin × Cout × K × K
  • BatchNorm(N): 参数数目: 2N
  • Embedding(N,W): 参数数目: N × W

参数占用显存 = 参数数目 × n

  • n = 4 :float32
  • n = 2 : float16
  • n = 8 : double64

PyTorch中,当执行完 model=MyGreatModel().cuda() 后就会占用相应的显存,占用的显存大小基本与上述分析的显存差不多(会稍大一些,因为其它开销)。

梯度与动量的显存占用

优化器

  • SGD:W_t+1 = W_t - α * ▽ F(W_t)
    • 除了保存权重W, 还要保存对应的梯度 ▽ F(W_t) ,因此, 显存占用等于参数占用显存 x2
  • 带Momentum-SGD:
    • v_t+1 = ρv_t + ▽ F(W_t)
    • W_t+1 = W_t - α * v_t+1
    • 还需要保存动量, 因此显存 x3
  • Adam优化器
    • 动量占用的显存更多,显存x4

总结,模型中与输入无关的显存占用包括:

  • 参数 W
  • 梯度 dW(一般与参数一样)
  • 优化器的动量
    • 普通SGD没有动量,momentum-SGD动量与梯度一样,Adam优化器动量的数量是梯度的两倍

输入输出的显存占用

以CNN为例,模型输出的显存占用,总结如下:

  • 需要计算每一层的feature map的形状(多维数组的形状)
  • 需要保存输出对应的梯度用以反向传播(链式法则)
  • 显存占用与 batch size 成正比
  • 模型输出不需要存储相应的动量信息。

深度学习中神经网络的显存占用,可以得到如下公式:

显存占用 = 模型显存占用 + batch_size × 每个样本的显存占用

显存不是和batch-size简单的成正比,尤其是模型自身比较复杂的情况下:比如全连接很大,Embedding层很大

另外需要注意:

  • 输入(数据,图片)一般不需要计算梯度
  • 神经网络每层输入/输出都需要保存下来,用来反向传播,但是在某些特殊的情况下,不要保存输入。
    • 比如 ReLU,PyTorch中,使用nn.ReLU(inplace = True) 能将激活函数ReLU的输出直接覆盖保存于模型的输入之中,节省不少显存。
    • 这时候是如何反向传播? (提示:y=relu(x) -> dx = dy.copy();dx[ y<=0 ] =0)

节省显存的方法

深度学习中,一般占用显存最多的是卷积等层的输出,模型参数占用的显存相对较少,而且不太好优化。

节省显存方法:

  • 降低 batch-size
  • 下采样 (NCHW -> (1/4)*NCHW)
  • 减少全连接层(一般只留最后一层分类用的全连接层

更多信息见原文

训练时显存不足怎么办?

常见的节省显存操作,优先级从高到低排列。

  1. 去掉 compute_metrics:
    • 有些代码会在输出层后计算rouge分等,这个会输出一个 batch_sizevocab_sizeseq_len 的一个大向量,非常占显存。
  2. 采用bf16/fp16进行混合精度训练:
    • 现在大模型基本上都采用 bf16 来进行训练
    • 但是 v100 不支持 bf16,可以采用fp16进行训练。显存占用能够降低1倍。
  3. Flash attention:不仅能够降低显存,更能提高训练速度。
  4. batch_size 调小:
    • batch size 与模型每层激活状态所占显存呈正相关
    • 降低 batch size 能够很大程度上降低这部分显存占用。
  5. 采用梯度累积
    • global_batch_size = batch_size * 梯度累积
    • 如果降低 batch_size 后想保持 global_batch_size 不变,可适当提高梯度累积值。
  6. 选择合适的上下文长度
    • 上下文长度与激活状态所占显存呈正相关
    • 因此可适当降低上下文长度来降低显存占用。
  7. DeepSpeed Zero
    • 显存占用从高到低为:Zero 1 > Zero 2 > Zero 2 + offload > zero 3 > zero 3 + offload
    • 推荐最多试到 Zero2 + offload
  8. 选择更小的基座模型:在满足需求的情况下,尽量选择更小的基座模型。

慎重选择:

  1. Lora: 能跑全参就别跑 LoraQlora,一方面是麻烦,另一方面的确是效果差点。
  2. Qlora: Qlora 速度比lora慢,但所需显存更少,实在没资源可以试试。
  3. Megatron-LM: 可采用流水线并行和张量并行,使用比较麻烦,适合喜欢折腾的同学。
  4. Pai-Megatron-LM: Megatron-LM 的衍生,支持 Qwen 的sft和pt,坑比较多,爱折腾可以试试。
  5. 激活检查点:不推荐,非常耗时。在反向传播时重新计算深度神经网络的中间值。用时间(重新计算这些值两次的时间成本)来换空间(提前存储这些值的内存成本)。

GPU 要存哪些参数

【2023-6-28】参考

模型训练中,GPU 要存储的参数

  • 模型本身的参数、优化器状态、激活函数的输出值、梯度、一些零时的Buffer
  • img

模型参数仅占所有数据的小部分

  • 当进行混合精度运算时,模型状态参数(优化器状态 + 梯度+ 模型参数)占大半以上。

因此,要想办法去除模型训练过程中的冗余数据。

LLaMA-6B 占用多大内存

【2023-7-13】LLaMA-6B 占用多大内存?计算过程

精度对所需内存的影响:

  • fp32精度,一个参数需要 32 bits, 4 bytes.
  • fp16精度,一个参数需要 16 bits, 2 bytes.
  • int8精度,一个参数需要 8 bits, 1 byte.

模型需要的RAM大致分三个部分:

  • 模型参数: 参数量*每个参数所需内存
    • 对于fp32,LLaMA-6B需要 6B*4 bytes = 24GB 内存
    • 对于int8,LLaMA-6B需要 6B*1 byte = 6GB 内存
  • 梯度: 参数量*每个梯度参数所需内存
  • 优化器参数: 不同的优化器所储存的参数量不同。
    • 对于常用的AdamW,需要储存两倍的模型参数(用来储存一阶和二阶momentum)。
    • fp32 的 LLaMA-6B,AdamW需要 6B*8 bytes = 48 GB
    • int8 的 LLaMA-6B,AdamW需要 6B*2 bytes = 12 GB
  • 其它
    • CUDA kernel也会占据一些RAM,大概1.3GB左右

综上,int8 精度的 LLaMA-6B 模型部分大致需要 6GB + 6GB + 12GB + 1.3GB = 25.3GB 左右。

再根据LLaMA的架构(hidden_size= 4096, intermediate_size= 11008, num_hidden_layers= 32, context_length = 2048)计算中间变量内存。每个instance需要: ( 4096+11008 ) * 2048 * 32 * 1 byte = 990 MB

所以,一张 A100(80GB RAM)大概可以在int8精度,batch_size = 50 的设定下进行全参数训练。

7B 占用多大内存

一个7B规模大模型(如LLaMA-2 7B),基于16-bit混合精度训练时

  • 仅考虑模型参数、梯度、优化器情况下,显存占用就有112GB
    • 参数占 GPU 显存近 14GB(每个参数2字节)。
    • 训练时梯度存储占14GB(每个参数对应1个梯度,也是2字节)
    • 优化器Optimizer(假设是主流的AdamW)则是84GB(每个参数对应1个参数copy、一个momentum和一个variance,这三个都是float32)
      • 2byte 模型静态参数权重(以16bit存储) = 14G
      • 2byte 模型更新参数权重 (以16bit存储)= 14G
      • 2byte 梯度(以16bit存储)= 14G
      • 2byte 梯度更新(以16bit存储)= 14G
      • 4byte 一阶动量优化器更新(以32bit存储)= 28G
      • 4byte 二阶方差优化器更新(以32bit存储)= 28G
    • 目前,合计 112GB
    • 还有:前向传播时激活值,各种临时变量
    • 还与sequence length, hidden size、batch size都有关系。
  • 目前A100、H100这样主流显卡单张是放不下,更别提国内中小厂喜欢用的A6000/5000、甚至消费级显卡。

Adam + fp16 混合精度预估

【2023-6-29】LLM Training GPU显存耗用量估计,以Adam + fp16混合精度训练为例,分析其显存占用有以下四个部分

  • (1) 模型权重 Model
    • Prameters (FP16) 2 bytes
    • Gradients (FP16) 2 bytes
  • (2) 前向激活值 Activations
    • 前向过程中存储, y = w1 * x, 存储x用于计算w1梯度
    • 整体显存占用与batch有关
  • (3) 优化器 Optimizer:梯度、动量等
    • Master Weight (FP32) 4 bytes
    • Adam m (FP32) 4 bytes
    • Adam v (FP32) 4 bytes
  • (4) 临时混存 Buffer & Fragmentation

(1) 和 (3) 可以精确估计

  • 显存占用大头是 Adam 优化器,占可计算部分的 12/16=75%
  • 其次是模型参数+梯度,显存容量至少是参数量的16倍

Adam + fp16混合精度训练

结论:

  • 不考虑Activation,3090 模型容量上限是 24/16=1.5B,A100 模型容量上限是 80/16=5B
    • 假设训练过程中batchsize恒定为1,也即尽最大可能减少Activation在显存中的占用比例,使得理论计算值16Φ更接近真实的显存占用,那么24G的3090的模型容量上限是1.5B(差不多是GPT-2的水平),80G的A100的模型容量上限是5B
  • 考虑Activation,3090的模型容量上限是 0.75B,A100的容量上限是 2.5B
    • batchsize为1的训练效率非常低,batchsize大于1才能充分发挥GPU的效率,此时Activation变得不可忽略。经验之谈,一般需要给Activation预留一半的显存空间(比如3090预留12G,A100预留40G),此时3090的模型容量上限是0.75B,A100的容量上限是2.5B,我们实际测试结果接近这个值
  • [1B, 5B] 是目前市面上大多数GPU卡的分水岭区间
    • [0, 1B) 市面上绝大多数卡都可以直接硬train一发
    • [1B, 5B] 大多数卡在这个区间的某个值上触发模型容量上限,具体触发值和显存大小有关
    • (5B, ~) 目前没有卡能裸训

LLM 推理显存开销

【2024-8-24】为大型语言模型 (LLM) 提供服务需要多少 GPU 内存?

运行一个大型语言模型,需要多大GPU内存?

GPU 内存估算公式

  • $ M=(P4B)/(32/Q)1.2 $

解释

  • M 代表 GPU 内存的大小,单位是吉字节。
  • P 指的是模型中包含的参数总数。
  • 4B 指的是每个参数平均占用的存储空间,为 4 个字节。
  • Q 表示加载模型时使用的位数,可以是 16 位或者 32 位。
  • 1.2 表示在计算中加入了 20% 的额外空间以应对可能的需求。

分解公式

  • 模型参数量 (P):这个指标反映了你的模型规模。比如,如果你使用的是 LLaMA 模型,它包含 700 亿个参数,那么这个参数量就是 700 亿。
  • 参数内存需求 (4B):通常情况下,每个模型参数需要 4 个字节的存储空间,这是因为浮点数通常需要 4 个字节(即 32 位)来表示。如果你采用的是半精度(16 位)格式,那么所需的内存量会相应减少。
  • 参数位宽 (Q):这个值取决于你是以 16 位还是 32 位的精度来加载模型。16 位精度在许多大型语言模型的应用中较为普遍,因为它在保证足够精度的同时,能够降低内存的消耗。
  • 额外开销 (1.2):乘以 1.2 的系数是为了增加 20% 的额外空间,以应对在模型推理过程中可能需要的额外内存。这不仅仅是为了安全起见,更是为了确保在模型执行过程中,激活操作和其他中间结果的内存需求得到满足。

700亿个参数(以 16位精度加载)的 LLaMA 模型提供服务所需的内存:

  • M = (P * 4B)/(32/Q) * 1.2 = (70 * 4 bytes)/(32/16) * 1.2 = 168 GB
  • 单块 NVIDIA A100 GPU,尽管配备了 80 GB 显存,但仍然不足以支撑该模型的运行。为了高效地处理内存需求,至少需要两块 A100 GPU,每块都具备 80 GB 的显存容量。

内存/显存优化

显存优化技术:参考

  • 重计算(Recomputation):Activation checkpointing(Gradient checkpointing)本质上是一种用时间换空间的策略。
  • 卸载(Offload)技术:一种用通信换显存的方法,简单来说就是让模型参数、激活值等在CPU内存和GPU显存之间左右横跳。如:ZeRO-Offload、ZeRO-Infinity等。
  • 混合精度(BF16/FP16):降低训练显存的消耗,还能将训练速度提升2-4倍。
    • BF16 计算时可避免计算溢出,出现Inf case。
    • FP16 在输入数据超过65506 时,计算结果溢出,出现Inf case。

CPU卸载

当GPU内存已满时,一种选择是将暂时未使用的数据卸载到CPU,并在以后需要时将其读回(Rhu等人,2016)。数据卸载到CPU 的想法很简单,但由于它会延长训练时间,所以近年来不太流行。

激活重新计算

激活重新计算(Activation recomputation (also known as “activation checkpointing” or “gradient checkpointing”,Chen等人,2016年)是一个以计算时间为代价减少内存占用的聪明而简单的想法

混合精度训练

Narang&Micikevicius等人(2018年)介绍了一种使用半精度浮点(FP16)数字训练模型而不损失模型精度的方法。

三种避免以半精度丢失关键信息的技术:

  • 1)全精度原始权重。维护累积梯度的模型权重的全精度 (FP32) 副本, 对于向前和向后传递,数字四舍五入到半精度。主要是为了防止每个梯度更新(即梯度乘以学习率)可能太小而无法完全包含在 FP16 范围内(即 2-24 在 FP16 中变为零)的情况。
  • 2)损失缩放。扩大损失以更好地处理小幅度的梯度(见图 16), 放大梯度有助于将权重移动到可表示范围的右侧部分(包含较大值)占据更大的部分,从而保留否则会丢失的值。
  • 3)算术精度。对于常见的网络算法(例如向量点积,向量元素相加减少),可以将部分结果累加到 FP32 中,然后将最终输出保存为 FP16,然后再保存到内存中。可以在 FP16 或 FP32 中执行逐点操作。

大模型训练过程中,GPU显存占用主要分成Model States 与 Activation 两部分

混合精度训练流程:通过引入fb16以及bf16精度来减少fb32精度带来的显存消耗。

  • 存储一份fp32的parameter,momentum和variance(统称model states)
  • 在forward开始之前,额外开辟一块存储空间,将fp32 parameter减半到fp16 parameter;
  • 正常做forward和backward,在此之间产生的activation和gradients,都用fp16进行存储;
  • 用fp16 gradients去更新fp32下的model states;
  • 当模型收敛后,fp32的parameter就是最终的参数输出;

混合精度下的显存:

通常模型会使用float32(fp32)精度进行训练,但是随着模型越来越大,训练的硬件成本和时间成本急剧增加。而混合精度训练通过利用float16(fp16)的优点并规避缺点来进行训练。

fp32,fp16,bf16的区别如下图所示

优点:

  1. 降低显存占用,float16比float32小一半;
  2. 减少网络通信开销;
  3. 硬件针对fp16优化,速度更快

缺点:

  1. 下溢。float16最大的问题是”下溢”。
    • 模型更新通常随着模型训练,值往往会很小,可能会超出float16表示的精度。
    • 结果就是:大多数的模型权重都不再更新,模型难以收敛。
  2. 舍入误差。
    • 模型权重和梯度相差太大,通过梯度更新权重并进行舍入时,可能导致更新前和更新后的权重没有变化。

bf16是一种全新的数字格式,更加支持深度学习计算,但需要硬件支持,如NVIDIA A100, NVIDIA A800等

此外,官方文档中提到了AMP(Auto Mixed Precision 自动混合精度训练) ,与ZeRO不能同时使用

Int8

Int8 - bitsandbytes

Int8是个很极端的数据类型,最多只能表示-128~127的数字,并且完全没有精度。

为了在训练和inference中使用这个数据类型,bitsandbytes使用了两个方法最大程度地降低了其带来的误差:

  • vector-wise quantization
  • mixed precision decompasition

Huggingface 用动图解释了quantization的实现

借助Huggingface PEFT,使用int8训练opt-6.5B的完整流程, notebook

FP 16

Fp16 - mixed precision

  • 混合精度训练大致思路: 在 forward pass 和 gradient computation 时用 fp16 来加速,但是在更新参数时使用 fp32。
  • Pytorch 官方示例

torch fp16推理:直接使用model.half()将模型转换为fp16.

model.eval()
model.half() # 半精度

Huggingface Transformers:fp16-training

  • TrainingArguments 里声明 fp16=True
training_args = TrainingArguments(per_device_train_batch_size=4, fp16=True, **default_args)

trainer = Trainer(model=model, args=training_args, train_dataset=ds)
result = trainer.train()
print_summary(result)

压缩

中间结果通常会消耗大量内存,尽管它们只在一次向前传递和一次向后传递中需要。这两种使用之间存在明显的时间差距。因此Jain等人(2018年)提出了一种数据编码策略,将第一次使用后的中间结果在第一次传递中进行压缩,然后将其解码回来进行反向传播。

内存高效优化器

优化器内存消耗。以流行的 Adam 优化器为例,它内部需要保持动量和方差,两者都与梯度和模型参数处于同一规模,但是需要节省 4 倍的模型权重内存。

分布式机器学习实现

【2022-6-2】分布式机器学习

在深度学习时代,训练数据特别大的时候想要单卡完成训练基本是不可能的。所以就需要进行分布式深度学习。

经验

流水并行 (Pipeline Parallelism ) 是 LLM 分布式训练扩展到千卡集群以上的一个核心 feature

并行度对比

NVIDIA 在 3076 张 A100 集群上训练的 1T 参数量 LLM 使用的并行方式是:

  • Data Parallel Size = 6
  • Tensor Parallel Size = 8
  • Pipeline Parallel Size = 64

并行度最高的是 流水并行,超过 DP 和 TP 10倍左右

为什么3k卡集群主流是流水并行?

流水并行核心优势:

  • 用比较少的 Pipeline Bubble 代价 (当 gradient accumulation step 很大时可以忽略不计),较少的 Tensor Buffer 显存代价,以及非常低的通信开销,将大模型分割在不同的 Group 中。 大幅减少了单张 GPU 上的 weight tensor 大小(数量) 和 Activation tensor 大小(数量)。
  • 跟 Tensor Parallel 相比, Pipeline Parallel 的通信代价很低且可以被 overlap, Tensor Parallel 虽然也能切分模型大小,但是需要全量数据(没有减少 Activation tensor 大小),另外极高的通信频率和通信量使得 Tensor Parallel 只能在机器内 8 张卡用 NVLink 等高速互联来实现,跨机的 TP 会严重拖慢速度。
  • 不仅如此, Pipeline Parallel 还将 Data Parallel 的模型更新限定在一个很小的范围内(比如六台机器), DP 所需的 AllReduce 通信会随着机器数量增多而变慢。 PP 也让 DP 所需同步的模型梯度大小变小了,大大减缓了模型更新对于训练速度的影响。

因此 Pipeline Parallel 是让模型可以达到千亿、集群可以扩充到千卡以上的一个最重要的特性。

流水并行有很重要的约束条件:

  • 需要一个 规整对称的、线性顺序的网络结构。

GPT 就是这样一个典型的网络结构:

  • 完全一样的 Transformer Layer 顺序堆叠,没有分叉和不对称情况,当均匀切分 Layer 时,各个 Stage 的前向/反向计算时间均一致。

作者:成诚 链接:https://www.zhihu.com/question/588325646/answer/3422090041 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

流水并行训练时的 time line 参考如下:

(反向的计算时间是前向的两倍)整个集群最高效的训练时间段是 step 4、5、6、7 的前向 和 step 0、1、2、3 的反向同时在所有 stage 上并行计算的时候,这个时候集群没有空闲,全部都在并行执行。 当我们增加 acc step (比如从 8 增加到 64)时,中间部分完美并行的时间段占比就会更长, bubble time 的占比就会越来越小。

而 T5 的网络结构比 GPT 要复杂很多, T5 是 Encoder-Decoder 架构,整个网络分为两大块,且 Encoder 和 Decoder 的 Transformer Layer 参数大小、Attention 计算量、Context Length 等均不一致,导致 Encoder 的理论计算量要比 Decoder 大很多(整个网络不是均匀对称的)。 更要命的是, T5 Encoder 的输出要发给每个 Decoder Layer,网络结构不是线性而是有大量的分叉,前向反向之间包含了复杂的数据依赖关系, 会导致流水并行中,各个 Stage 之间会产生大量的、非对称的、间隔跨多个 Stage 的数据依赖,更加剧了流水并行的 load balance 问题。

所以直接使用 Megatron 跑 T5 的 Pipeline Parallelism,会从 nsys prof 时间线上看到大量的缝隙,各个 Stage 之间在互相等待,无法真正流水并行起来。

如果不用 Pipeline Parallelism 来训练 T5,那么只能借助: DP、TP 和 ZeRO 来进行并行优化了, 这就约束了 T5 的所有 Layer 都必须放在每一个 GPU 上,这种方式在 13B 量级的模型上是 OK 的,但是再往上扩展到 100B、1T 量级就不 work 了。

同时由于 TP 只能开到 8 (跨机器也会慢几倍), 在千卡 GPU 集群以上,大量的 DP 带来的通信变慢的影响也很严重(ZeRO-2/3 会大幅加剧这种通信开销)。 所以我们才说, 虽然 T5 的理论计算量相较于 GPT 没有增加很多,但是在千亿参数、千卡集群以上规模的时候,T5 的实际训练效率比 GPT 慢很多倍。即使到现在,也没有一个超过 11B 的 T5 模型发布, 而 11B 恰好是一个不借助 PP,仅通过 ZeRO + TP 就可以训练的模型大小,避免了 T5 的模型结构非对称性对于 PP 的灾难性影响。

基本原理

无论哪种机器学习框架,分布式训练的基本原理都是相同的。可以从并行模式架构模式同步范式物理架构通信技术等五个不同的角度来分类。

更多信息见优质paper,把 DP(Data Parallel)、MP(Model Parallel)、PP(Pipeline Parallel)各个方面讲的很透彻

并行模式

分布式训练目的:将原本巨大的训练任务拆解成多个子任务,每个子任务在独立的机器上单独执行。

大规模深度学习任务的难点在于:

  • 1) 训练数据量巨大:将数据拆解成多个小模型分布到不同的node上。→ 数据并行
  • 2) 训练模型的参数巨大:将数据集拆解分布到不同的node上。→ 模型并行
    • NLP的预训练模型实在太大了
并行模式   图解
数据并行 单机多卡用DP(PS),多级多可用DDP(Ring Allreduce)
模型并行  
流水线并行    

数据并行(DP&DDP)

数据并行相对简单,N个node(worker)构成一个分布式集群,每个worker处理1/N的数据。

  • 理论情况下能达到线性的加速效果。
  • TF、torch、Horovod都可以在原生支持或者微小的改动实现数据并行模式。

DP(单机)+DDP(多机)

数据并行(DP&DDP)

  • DP(Data Parallelism):早期数据并行模式,一般采用参数服务器(Parameters Server)编程框架。实际中多用于单机多卡
  • DDP(Distributed Data Parallelism):分布式数据并行,采用Ring AllReduce 通讯方式,多用于多机多卡场景。

DP 单机数据并行

数据并行本质

  • 单进程多线程实现方式,只能实现单机训练, 不算严格意义上的分布式训练

多个GPU 情况下,将模型分发到每个GPU上去,每个GPU都保留完整模型参数。

  • 每个GPU加载全部模型(Parameter、Grad、Optimizer、Activation、Temp buffer)
  • 将每个batch样本平均分配到每个GPU上进行梯度计算
  • 然后汇总每个GPU上的梯度
  • 将汇总梯度重新分发到每个GPU上,每个GPU模型根据汇总的梯度进行模型参数更细。

K个GPU并数据并行训练过程如下:

  • 任何一次训练迭代中,给定的随机的小批量样本都将被分成K个部分,并均匀地分配到GPU上;
  • 每个GPU根据分配给它的小批量子集,计算模型参数的损失和梯度;
  • 将个GPU中的局部梯度聚合,以获得当前小批量的随机梯度;
  • 聚合梯度被重新分发到每个GPU中;
  • 每个GPU使用这个小批量随机梯度,来更新所维护的完整的模型参数集。

数据并行是在每个worker上存储一个模型的备份,在各个worker 上处理不同的数据子集。然后需要规约(reduce)每个worker的结果,在各节点之间同步模型参数。

  • 这一步会成为数据并行的瓶颈,因为如果worker很多的情况下,worker之间的数据传输会有很大的时间成本。

参数同步后,需要采用不同的方法进行参数更新:

  • 参数平均法:最简单的一种数据平均化
  • 更新式方法

若采用参数平均法,训练的过程如下所示:基于模型的配置随机初始化网络模型参数

  • 将当前这组参数分发到各个工作节点
  • 在每个工作节点,用数据集的一部分数据进行训练
  • 将各个工作节点的参数的均值作为全局参数值
  • 若还有训练数据没有参与训练,则继续从第二步开始

更新式方法与参数平均化类似,主要区别在于,在参数服务器和工作服务器之间传递参数时,更新式方法只传递更新信息(梯度和张量)。

问题:

  • 负载不均衡,主GPU负载大
  • PS 架构通信开销大

DDP 分布式数据并行

DDP (Distribution Data Parallel)

  • AllReduce 架构,在单机和多机上都可以使用。
  • 负载分散在每个gpu节点上,通信成本是恒定的,与 GPU 数量无关。

模型并行(model parallesim)

模型参数过大,单个 GPU无法容纳模型参数时,就需要模型并行, 将模型拆分到多个 GPU 训练。

模型并行相对复杂

  • 原理:分布式系统中的不同worker负责网络模型的不同部分
  • 例如,神经网络的不同层被分布到不同worker或者同一层的不同参数被分配到不同worker上。
  • 对于TF这种框架,可以拆分计算图成多个最小依赖子图到不同的worker上。同时在多个子图之间通过通信算子来实现模型并行。

但是模型并行实现起来比较复杂。工业界还是以数据并行为主。

层间 & 层内

Model Parallel主要分两种:intra-layer拆分 和 inter-layer拆分

  • inter-layer拆分:对模型做网络上的拆分,将每一层或者某几层放在一个worker上单独训练。
    • 缺点:模型训练串行,整个模型的效率取决于最慢的那一层,存在资源浪费
  • intranet-layer拆分:深度学习的网络结构基本都是一层层的。常规的卷积、池化、BN等等。如果对某一层进行了拆分,那么就是intra-layer拆分。对单层的拆分其实就是拆分这一层的matrix运算。
    • 参考论文:Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism

对比

  • 层间并行: 流水线并行
  • 层内并行: 张量并行
概念 中文 图解
intra-layer and inter-layer 层间并行和层内并行
orthogonal and complimentary 正交和互补

模型并行通常分为张量并行(纵向切分)以及流水线并行(横向切分)

  • 流水线并行(Pipeline model parallesim)
    • 朴素拆分方式: 将模型各层分组后装载到各个GPU上去,GPU之间进行串行计算
      • 缺点: GPU 利用率太低,当一个GPU进行计算时,其他层的GPU都闲置。
    • 改进: 谷歌提出了GPipe 流水线并行(Pipeline model parallesim ), 引入micro-batches (MBS)的概念,会提升GPU利用率
    • 问题: 流水线最大的问题, 无法充分利用GPU资源,training过程中会出现非预期的Bubble
  • 张量并行(Tensor Model Parallelism)
    • 张量并行(TP)是模型并行一种形式,流水线并行按网络层切分,张量并行按矩阵切分。
    • 2019年,NVIDIA发布《Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM》论文,提出了张量并行方法
    • 核心思想: 每个GPU仅处理矩阵一部分,当算子需要整个矩阵的时候再进行矩阵聚合。无论是横向切分还是竖向切分,都可以将切分后的矩阵放到不同GPU上进行计算,最后将计算的结果再合并。

大模型主要结构都是Transformer模型,Transformer核心模块网路结构:anttention层+残差连接,MLP层+残差连接。

  • MLP层: 数学表达如下:Y = GeLU(XA)Z = Dropout(YB)
  • Attention层: 数学表达如下:Y = Self-Attention(X)Z = Dropout(YB), 多头注意力每个头都是独立的,因此张量切分更方便

大模型训练时,ZeRO支持将模型显存内存占用划分到多张卡或者多个节点。

示例

【2023-8-28】模型并行最佳实践(PyTorch)

两个GPU上运行此模型,只需将每个线性层放在不同的GPU上,然后移动输入(input)和中间输出(intermediate outputs)以匹配层设备(layer devices)。

import torch
import torch.nn as nn
import torch.optim as optim

class ToyModel(nn.Module):
  """
    模型并行示例
  """

  def __init__(self):
    # 模型定义修改: 只需增加 to(device)
    super(ToyModel, self).__init__()
    self.net1 = torch.nn.Linear(10, 10).to('cuda:0')  # 将net1放置在第1个GPU上
    self.relu = torch.nn.ReLU()
    self.net2 = torch.nn.Linear(10, 5).to('cuda:1')   # 将net2放置在第2个GPU上

  def forward(self, x):
    x = self.relu(self.net1(x.to('cuda:0')))
    return self.net2(x.to('cuda:1'))

注意 ToyModel

  • 除了5个用于将线性层(linear layers)和张量(tensors)放置在适当设备上的to(device)调用之外,以上内容与在单个GPU上实现该功能非常相似。那是模型中唯一更改地方(即to(device) )。
  • 在 backward()和 torch.optim 会自动关注梯度(gradients),模型如同一个GPU。
  • 调用损失函数时,只需确保标签(label)与输出(output)在同一设备(on the same device)上。
model = ToyModel()
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.paraeters(), lr=0.001)

optimizer.zero_grad()
outputs = model(torch.randn(20, 10))
labels = torch.randn(20, 5).to('cuda:1') # ToyMode 的 output 是在 'cuda:1' 上,此处的 label 也应该置于 'cuda:1' 上
loss_fn(outputs,labels).backward()
optimizer.step()

只需更改几行,就可以在多个GPU上运行现有的单GPU模块。

如何分解 torchvision.models.reset50() 为两个GPU。

  • 从现有 ResNet模块继承,并在构建过程中将层拆分为两个GPU。
  • 然后覆盖 forward方法来缝合两个子网,通过相应地移动中间输出。
from torchvision.models.resnet import ResNet, Bottleneck

num_classes = 1000

class ModelParallelResNet50(ResNet):
    def __init__(self, *args, **kwargs):
        super(ModelParallelResNet50, self).__init__(Bottleneck, [3, 4, 6, 3], num_classes=num_classes, *args, **kwargs)

        self.seq1 = nn.Sequential(
            self.conv1,
            self.bn1,
            self.relu,
            self.maxpool,
            # 模型拆分
            self.layer1,
            self.layer2
        ).to('cuda:0')  # 放置在第1个GPU上

        self.seq2 = nn.Sequential(
            self.layer3,
            self.layer4,
            self.avgpool,
        ).to('cuda:1')  # 放置在第2个GPU上

        self.fc.to('cuda:1')

    def forward(self, x):
        x = self.seq2(self.seq1(x).to('cuda:1'))
        return self.fc(x.view(x.size(0), -1))

对于模型太大而无法放入单个GPU的情况,上述实现解决了该问题。但是,如果模型合适,model parallel 将比在单个GPU上运行要

  • 因为在任何时间点,两个GPU中只有1个在工作,而另一个在那儿什么也没做。
  • 在 layer2 和 layer3之间,中间输出需要从 cuda:0 复制到 cuda:1,这使得性能进一步恶化。
import torchvision.models as models

num_batches = 3
batch_size = 120
image_w = 128
image_h = 128

def train(model):
    model.train(True)
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001)

    one_hot_indices = torch.LongTensor(batch_size) \
                           .random_(0, num_classes) \
                           .view(batch_size, 1)

    for _ in range(num_batches):
        # generate random inputs and labels
        inputs = torch.randn(batch_size, 3, image_w, image_h)
        labels = torch.zeros(batch_size, num_classes) \
                      .scatter_(1, one_hot_indices, 1)

        # run forward pass
        optimizer.zero_grad()
        outputs = model(inputs.to('cuda:0'))

        # run backward pass
        labels = labels.to(outputs.device)
        loss_fn(outputs, labels).backward()
        optimizer.step()

两个GPU中的一个会处于空闲状态。怎么优化?

  • 将每个批次进一步划分为拆分流水线,当1个拆分到达第2子网时,可以将下一个拆分馈入第一子网。这样,两个连续的拆分可以在两个GPU上同时运行。

流水线输入(Pipelining Inputs)加速

  • 将每批次 120-image 进一步划分为 20-image 。当PyTorch异步启动CUDA操作时,该实现无需生成多个线程即可实现并发。
class PipelineParallelResNet50(ModelParallelResNet50):
    def __init__(self, split_size=20, *args, **kwargs):
        super(PipelineParallelResNet50, self).__init__(*args, **kwargs)
        self.split_size = split_size

    def forward(self, x):
        splits = iter(x.split(self.split_size, dim=0))
        s_next = next(splits)
        s_prev = self.seq1(s_next).to('cuda:1')
        ret = []

        for s_next in splits:
            # A. s_prev runs on cuda:1
            s_prev = self.seq2(s_prev)
            ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

            # B. s_next runs on cuda:0, which can run concurrently with A
            s_prev = self.seq1(s_next).to('cuda:1')

        s_prev = self.seq2(s_prev)
        ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

        return torch.cat(ret)


setup = "model = PipelineParallelResNet50()"
pp_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
pp_mean, pp_std = np.mean(pp_run_times), np.std(pp_run_times)

plot([mp_mean, rn_mean, pp_mean],
     [mp_std, rn_std, pp_std],
     ['Model Parallel', 'Single GPU', 'Pipelining Model Parallel'],
     'mp_vs_rn_vs_pp.png')

设备到设备的张量复制操作在源设备和目标设备上的当前流(current streams)上同步。如果创建多个流,则必须确保复制操作正确同步。在完成复制操作之前写入源张量或读取/写入目标张量可能导致不确定的行为。上面的实现仅在源设备和目标设备上都使用默认流,因此没有必要强制执行其他同步。

流水线并行

数据并行还是模型并行都会在相应机器之间全连接通信,当机器数量增大时,通信开销和时延会大到难以忍受

流水线(管道)并行既解决了超大模型无法在单设备装下的难题,又解决了机器之间的通信开销的问题

  • 每个阶段(stage) 和下一个阶段之间仅有相邻的某一个 Tensor 数据需要传输
  • 每台机器的数据传输量跟总的网络大小、机器总数、并行规模无关。

流水线并行(Pipeline model parallesim)

  • 朴素拆分方式: 将模型各层分组后装载到各个GPU上,GPU之间进行串行计算
  • 缺点: GPU 利用率太低,当1个GPU进行计算时,其他层GPU都闲置。

改进方法如下

  • GPipe
  • PipeDream

G-pipe

谷歌提出 G-pipe 流水线并行(Pipeline model parallesim ), 引入micro-batches (MBS)的概念,会提升GPU利用率

  • F-then-B 调度方式: 原 mini-batch(数据并行切分后的batch)划分成多个 micro-batchmini-batch再切分后的batch),每个 pipeline stage (流水线并行的计算单元)先整体进行前向计算,再进行反向计算。同一时刻分别计算模型的不同部分,F-then-B 可以显著提升设备资源利用率。
  • F-then-B 模式由于缓存了多个 micro-batch 的中间变量和梯度,显存的实际利用率并不高。
  • 解决: 1F1B (在流水线并行中,pipeline stage 前向计算和反向计算交叉进行的方式)流水线并行方式。1F1B 模式下,前向计算和反向计算交叉进行,可以及时释放不必要的中间变量。

PipeDream

PipeDream 在单个 GPU 上短暂运行性能分析后,自动决定怎样分割这些 DNN 算子,如何平衡不同 stage 之间的计算负载,而同时尽可能减少目标平台上的通信量。

PipeDream将DNN 层划分为多个阶段 —— 每个阶段(stage)由模型中的一组连续层组成。

  • PipeDream把模型的不同的阶段部署在不同的机器上,每个阶段可能有不同的replication。该阶段对本阶段中所有层执行向前和向后传递。
  • PipeDream将包含输入层的阶段称为输入阶段,将包含输出层的阶段称为输出阶段。

virtual pipeline

virtual pipeline 是 Megatron-2 论文中最主要的一个创新点。

  • 传统的 pipeline 并行通常会在一个 Device 上放置几个 block,为了扩展效率,在计算强度和通信强度中间取一个平衡。
  • 但 virtual pipeline 在 device 数量不变的情况下,分出更多的 pipeline stage,以更多的通信量,换取空泡比率降低,减小了 step e2e 用时。

张量并行(Tensor Parallelism)

流水线并行主要集中在多层神经网络架构训练上,对于Transformer架构的模型(如BERT,GPT等),MultiHead Attention LayerMLP的计算量翻了几倍,如果继续按管线切分模型, 可能单层参数都无法被显存装载,因此需要横着把同一层的模型切分开来,这便是张量并行

  • 层间并行: 流水线并行
  • 层内并行: 张量并行

分布式张量计算正交且更通用,将张量操作划分到多个设备上,以加速计算或增加模型大小。

  • Masked Multi Self AttentionFeed Forward 都进行切分以并行化,利用Transformers网络的结构,通过添加一些同步原语来创建一个简单的模型并行实现。

张量并行(Tensor Model Parallelism)

  • 张量并行(TP)是模型并行一种形式,流水线并行按网络层切分,张量并行按矩阵切分。
  • 2019年,NVIDIA发布《Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM》论文,提出了张量并行方法
  • 核心思想: 每个GPU仅处理矩阵一部分,当算子需要整个矩阵的时候再进行矩阵聚合。无论是横向切分还是竖向切分,都可以将切分后的矩阵放到不同GPU上进行计算,最后将计算的结果再合并。

张量并行最有名的是: MegatronDeepspeed

混合并行

随着训练设备的增加,多个worker之间的通信成本增加,模型Reduce的成本也越来越大,数据并行的瓶颈也随之出现。故有学者提出混合并行(数据并行+模型并行)

架构模式

分布式训练上会频繁用到规约(AllReduce)操作。

all-reduce 操作有多种方式实现:

  • 树状结构:数据在进程间以树状结构进行归约,每个非叶子节点负责将其子节点的数据归约后再传递给其父节点。
  • 环形结构:进程之间形成一个环,数据在环中按顺序传递并归约。
  • 直接归约:所有进程直接将数据发送给一个中心节点,该节点完成归约后将结果发送回所有进程。

all-reduce 操作性能对分布式计算的效率至关重要,因此优化这一操作是分布式系统设计中的一个研究热点。使用最多的实现方式是百度提出的 Ring AllReduce 算法,该方法属于环状结构实现的一种。

主流的分布式架构主要分为参数服务器(ParameterServer) 和基于规约(Reduce)两种模式。早期还有基于MPI的方式,不过现在已经很少用了。

传统 parameter server: server和client方式

  • client通过计算分配给自己的数据,产生梯度,传给server
  • server 聚合,然后把参数再传给client

这个方式的弊端: server容易成为瓶颈

  • server通信量太大。
  • 一个client失败,会导致其他client等待。

Ring all reduce 一种分布式方式

  • 各个节点分配通信量。
  • 总的通信量和ps没啥变化,但是通信的压力平摊到各个GPU上了,GPU之间的通信可以并行进行。

假如,GPU数量是N,把模型参数分成N份,每个GPU要存放整个参数。每个GPU也要分配训练数据。

  • 当一次迭代,N个GPU之间要经过一个scatter和gather操作,reduce-scatter是将不同gpu上对应的参数的gradient相加,一共需要通讯(N-1)次。
  • All-gather 将合并完整的参数,传到其他gpu上,需要通讯(N-1)次。
  • 一次all reduce,单卡通信量为2*sita。

PS:参数服务器

ParameterServer模式是一种基于reduce和broadcat算法的经典架构。

  • 其中一个/一组机器作为PS架构的中心节点,用来存储参数和梯度
  • 在更新梯度的时候,先全局reduce接受其他worker节点的数据,经过本地计算后(比如参数平均法),再broadcast回所有其他worker。
  • 论文: Parameter Server for Distributed Machine Learning
  • 中文解读

PS架构的问题在于多个worker与ps通信,PS本身可能存在瓶颈

  • 随着worker数量的增加,整体通信量也线性增加,加速比也可能停滞在某个点位上。

基于规约 Reduce模式

基于规约的模式解决了上述的问题,最典型的是百度提出的 Ring-AllRuduce。

  • 多个Worker节点连接成一个环,每个Worker依次把自己的梯度同步给下一个Worker,经过至多 2*(N-1) 轮同步,就可以完成所有Worker的梯度更新。
  • 这种方式下所有节点的地位是平等的,因此不存在某个节点的负载瓶颈,随着Worker的增加,整体的通信量并不随着增加。加速比几乎可以跟机器数量成线性关系且不存在明显瓶颈。

目前,越来越多的分布式训练采用Reduce这种模式。Horovod中主要就是用的这种分布式架构。

同步范式

实际训练过程中可能遇到各种问题,比如:部分节点资源受限、卡顿、网络延时等等

因此梯度同步时就存在“木桶“效应,即集群中的某些worker比其他worker更慢,导致整个训练pipeline需要等待慢的worker,整个集群的训练速度受限于最慢机器的速度。

因此梯度同步有“同步”(sync)、“异步”(Async)和混合三种范式。

  • 同步范式:只有所有worker完成当前的计算任务,整个集群才会开始下一次迭代。
    • TF中同步范式使用SyncReplicasOptimizer优化器
  • 异步模式刚好相反,每个worker只关心知己的进程,完成计算后就尝试更新,能与其他多个worker同步梯度完成取决于各worker当前时刻的状态。其过程不可控,有可能出现模型正确性问题。(可在训练时logging对比)
  • 混合范式结合以上两种情况,各个worker都会等待其他worker的完成,但不是永久等待,有timeout的机制。如果超时了,则此情况下相当于异步机制。并且没来得及完成计算的worker,其梯度则被标记为“stale”而抛弃或另做处理。

梯度累加

Gradient Accumulation 把一个大 Batch 拆分成多个 micro-batch, 每个 micro-batch 前后向计算后的梯度累加,在最后一个micro-batch累加结束后,统一更新模型。

micro-batch数据并行高度相似性:

  • 数据并行是空间上,数据被拆分成多个 tensor,同时喂给多个设备并行计算,然后将梯度累加在一起更新;
  • 而 micro-batch 是时间上的数据并行,数据被拆分成多个 tensor, 按照时序依次进入同一个设备串行计算,然后将梯度累加在一起更新。当总的 batch size 一致,且数据并行的并行度和 micro-batch 的累加次数相等时,数据并行和 Gradient Accumulation 在数学上完全等价。

Gradient Accumulation 通过多个 micro-batch的梯度累加, 使下一个 micro-batch 的前向计算不需要依赖上一个 micro-batch 的反向计算,因此可以畅通无阻的进行下去(当然在一个大 batch 的最后一次 micro-batch 还是会触发这个依赖)。

Gradient Accumulation 解决了很多问题:

  • 单卡下,Gradient Accumulation 将一个大 batch size 拆分成等价的多个小 micro-batch ,从而达到节省显存的目的。
  • 数据并行下,Gradient Accumulation 解决了反向梯度同步开销占比过大的问题(随着机器数和设备数的增加,梯度的 AllReduce 同步开销也加大),因为梯度同步变成了一个稀疏操作,因此可以提升数据并行的加速比。
  • 流水并行下, Gradient Accumulation 使得不同 stage 之间可以并行执行不同的 micro-batch, 从而让各个阶段的计算不阻塞,达到流水的目的。如果每个 micro-batch 前向计算的中间结果(activation)被后向计算所消费,则需要在显存中缓存 8多份(梯度累加的次数)完整的前向 activation。这时就不得不用另一项重要的技术:激活检查点(activation checkpointing)。

物理架构

物理架构主要是 GPU架构,即:单机单卡、单机多卡、多机单卡、多机多卡(最典型)

  • 单机单卡:常规操作
  • 单机多卡:利用一台GPU上的多块GPU进行分布式训练。数据并行和模型并行皆可。整个训练过程一般只有一个进程,多GPU之间的通信通过多线程的方式,模型参数和梯度在进程内是共享的(基于NCCL的可能不大一样)。这种情况下基于Reduce的架构比PS架构更合适一些,因为不需要一个显式的PS,通过进程内的Reduce即可完成梯度同步。
  • 多机单卡:操作上与多机多卡基本一致
  • 多机多卡:多机多卡是最典型的分布式架构,所以它需要较好的进程间的通讯机制(多worker之间的通信)。

内容:

  • 分布式训练的基本原理
  • TensorFlow的分布式训练
  • PyTorch的分布式训练框架
  • Horovod分布式训练

分布式实现

超大规模语言模型主要有两条技术路线:

  • (1) TPU + XLA + TensorFlow/JAX : Google主导,由于TPU和自家云平台GCP深度绑定
  • (2) GPU + PyTorch + Megatron-LM + DeepSpeed: NVIDIA、Meta、MS大厂加持,社区氛围活跃

(1) 对于非Googler 只可远观而不可把玩,(2) 更受到群众欢迎。

TF分布式训练方法

TensorFlow主要的分布式训练的方法有三种:

  1. Customer Train Loop:最原始,由框架工程师自己开发
  2. Estimator + Strategy:高级API,不用关心底层硬件
  3. Keras + Strategy:最新出的keras的高级API
  • 实际的开发工作中,分布式的工作最好是交给框架,而工程师本身只需要关注任务模型的pipeline就行了。
  • 最经典的是Spark框架,工程师只需要关注数据处理的workflow,分布式的大部分工作都交给框架。深度学习的开发同样如此。

各种方式评价

  • 第一种方式太过原生,整个分布式的训练过程完全交给工程师来处理,代码模块比较复杂,这里不做赘述。
  • 第二种方式,Estimator是TF的一个高级API,在分布式场景下,其最大的特点是单机和分布式代码一致,且不需要考虑底层的硬件设施。Strategy是tensorflow根据分布式训练的复杂性,抽象出的多种分布式训练策略。TF1.x和TF2.x接口变化较大,不同版本名字可能不一样,以实际使用版本为准。用的比较多的是:
    • MirroredStrategy:适用于单机多卡、数据并行、同步更新的分布式训练,采用Reduce的更新范式,worker之间采用NCCL进行通信。
    • MultiWorkerMirroredStrategy:与上面的类似,不同的是这种策略支持多机多卡、数据并行、同步更新的分布式策略、Reduce范式。在TF 1.15版本里,这个策略叫CollectiveAllReduceStrategy。
    • ParameterServerStrategy:经典的PS架构,多机多卡、数据并行、同步/异步更新
    • 使用Estimator+Strategy 实现分布式训练,参考代码
  • 第三种方式 Keras + Strategy 是Tensorflow最新官方推荐的方案。主要是利用keras的高级API,配合Strategy实现多模式的分布式训练。

后两种方法都需要传入TF_CONFIG参数,没有就是单机的训练方式。Strategy会自动读取环境变量并应用相关信息。

TF_CONFIG的配置如下

单机单卡

单机单卡是最普通的情况,当然也是最简单的。

使用步骤

  • 检查可用GPU数量
  • 获取一个GPU实例
  • 迁移:将 数据/模型 推送到GPU上

TF

示例代码如下:

#coding=utf-8
#单机单卡,对于单机单卡,可以把参数和计算都定义再gpu上,不过如果参数模型比较大,显存不足等情况,就得放在cpu上
import  tensorflow as tf
with tf.device('/cpu:0'):#也可以放在gpu上
    w=tf.get_variable('w',(2,2),tf.float32,initializer=tf.constant_initializer(2))
    b=tf.get_variable('b',(2,2),tf.float32,initializer=tf.constant_initializer(5))
with tf.device('/gpu:0'):
    addwb=w+b
    mutwb=w*b
init=tf.initialize_all_variables()
with tf.Session() as sess:
    sess.run(init)
    np1,np2=sess.run([addwb,mutwb])
    print np1,np2

PyTorch

pytorch实现

  • 封装程度非常高,只需保证即将被推到 GPU 的数据是张量(Tensor)或者模型(Module),就可以用 to() 函数快速进行实现。
import torch
from torch import nn

data = torch.ones((3, 3)) # 定义数据(张量)
print(data.device)
net = nn.Sequential(nn.Linear(3, 3)) # 定义模型

print(torch.cuda.is_available())     # 判断当前的机器是否有可用的 GPU
print(torch.cuda.device_count())     # 目前可用的 GPU 的数量。
# 使用第一块GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # cuda: 0 表示使用的是第一块 GPU。当然可以不用声明“:0”,默认就从第一块开始
print(device) # cpu 或 0
# 数据迁移:将data推到(迁移)gpu上
data_gpu = data.to(device)
print(data_gpu.device)
# 模型迁移:model推到gpu
net.to(device)

单机多卡

TF

  • 单机多卡,只要用device直接指定设备,就可以进行训练,SGD采用各个卡的平均值
  • 问题:除了取均值,还有别的方式吗?
#coding=utf-8
#单机多卡:一般采用共享操作定义在cpu上,然后并行操作定义在各自的gpu上,比如对于深度学习来说,我们一般把参数定义、参数梯度更新统一放在cpu上,各个gpu通过各自计算各自batch数据的梯度值,然后统一传到cpu上,由cpu计算求取平均值,cpu更新参数。具体的深度学习多卡训练代码,请参考:https://github.com/tensorflow/models/blob/master/inception/inception/inception_train.py
import  tensorflow as tf
  
with tf.device('/cpu:0'):
    w=tf.get_variable('w',(2,2),tf.float32,initializer=tf.constant_initializer(2))
    b=tf.get_variable('b',(2,2),tf.float32,initializer=tf.constant_initializer(5))
with tf.device('/gpu:0'):
    addwb=w+b
with tf.device('/gpu:1'):
    mutwb=w*b
  
ini=tf.initialize_all_variables()
with tf.Session() as sess:
    sess.run(ini)
    while 1:
        print sess.run([addwb,mutwb])
  • 多个 GPU 上运行 TensorFlow,则可以采用多塔式方式构建模型,其中每个塔都会分配给不同 GPU。例如:
# Creates a graph.
c = []
for d in ['/device:GPU:2', '/device:GPU:3']:
  with tf.device(d):
    a = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[2, 3])
    b = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[3, 2])
    c.append(tf.matmul(a, b))
with tf.device('/cpu:0'):
  sum = tf.add_n(c)
# Creates a session with log_device_placement set to True.
sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
# Runs the op.
print(sess.run(sum))
  • 【2020-5-20】每个gpu的梯度要累加起来,单独计算
        # train op def
        tower_grads = []
        for i in xrange(FLAGS.num_gpus):
            with tf.device('/gpu:{}'.format(i)):
                with tf.name_scope('tower_{}'.format(i)):
                    next_batch = dhs.get_next_batch()
                    cnn.inference(
                        next_batch[0], next_batch[1], next_batch[2],
                        dropout_keep_prob=FLAGS.dropout_keep_prob,
                        input_dropout_keep_prob=FLAGS.input_dropout_keep_prob,
                        phase_train=True)
                    grads = optimizer.compute_gradients(cnn.loss)
                    tower_grads.append(grads)
        grads = average_gradients(tower_grads)
        train_op = optimizer.apply_gradients(grads, global_step=global_step)

def average_gradients(tower_grads):
    """
    Calculate the average gradient for each shared variable across all towers.
    Note that this function provides a synchronization point across all towers.
    NOTE: This function is copied from cifar codes in tensorflow tutorial with minor
    modification.
    Args:
        tower_grads: List of lists of (gradient, variable) tuples. The outer list
            is over individual gradients. The inner list is over the gradient
            calculation for each tower.
    Returns:
       List of pairs of (gradient, variable) where the gradient has been averaged
       across all towers.
    """
    average_grads = []
    for grad_and_vars in zip(*tower_grads):
        # Note that each grad_and_vars looks like the following:
        #   ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))
        grads = []
        for g, _ in grad_and_vars:
            # Add 0 dimension to the gradients to represent the tower.
            # NOTE: if batch norm applied, the grad of conv-maxpool-n/b will be
            #       None
            if g is None:
                continue
            expanded_g = tf.expand_dims(g, 0)

            # Append on a 'tower' dimension which we will average over below.
            grads.append(expanded_g)

        # Average over the 'tower' dimension.
        if grads:
            grad = tf.concat(axis=0, values=grads)
            grad = tf.reduce_mean(grad, 0)
        else:
            grad = None

        # Keep in mind that the Variables are redundant because they are shared
        # across towers. So .. we will just return the first tower's pointer to
        # the Variable.
        v = grad_and_vars[0][1]
        grad_and_var = (grad, v)
        average_grads.append(grad_and_var)
    return average_grads

PyTorch

PyTorch 多种解决方案,最简单常用:nn.DataParallel()

  • module :定义的模型
  • device_ids 即为训练模型时用到的 GPU 设备号,
  • output_device 表示输出结果的 device,默认为 0 也就是第一块卡。

工作过程

  • 在每个迭代训练的Forward过程中:nn.DataParallel都自动将输入按照GUP数量进行split;然后复制模型参数到各个GPU上;分别进行正向计算后将得到网络输出output_x;最后将结果concat拼接到一起送往0号卡中。
  • 在Backward过程中:先由0号卡计算loss函数,通过loss.backward()得到损失函数相于各个gpu输出结果的梯度grad_l1 … gradln;接下来0号卡将所有的grad_i送回对应的GPU_i中;然后GPU们分别进行backward得到各个GPU上面的模型参数梯度值gradm1 … gradmn;最后所有参数的梯度汇总到GPU0卡进行update。

多卡训练时,output_device 的卡所占的显存明显大一些。

  • 因为使用 DataParallel 时,数据并行,每张卡获得的数据都一样多,但是所有卡的 loss 都会在第 output_device 块 GPU 进行计算,这导致了 output_device 卡的负载进一步增加。

只需要一个 DataParallel 函数就可以将模型和数据分发到多个 GPU 上。

  • 但是还是需要了解这内部的运行逻辑, 遇到了诸如时间计算、资源预估、优化调试问题的时候,可以更好地运用 GPU
import os
from torch import nn
import torch

class ASimpleNet(nn.Module):
    def __init__(self, layers=3):
        super(ASimpleNet, self).__init__()
        self.linears = nn.ModuleList([nn.Linear(3, 3, bias=False) for i in range(layers)])   # 设备有几个,就创建几个模型分支,
    def forward(self, x):     # 模型前馈实际处理过程
        print("forward batchsize is: {}".format(x.size()[0]))
        x = self.linears(x)
        x = torch.relu(x)
        return x

device=os.environ['CUDA_VISIBLE_DEVICES']  
# os.environ['CUDA_VISIBLE_DEVICES']="0,2"  指定具体的设备
# print("CUDA_VISIBLE_DEVICES :{}".format(os.environ["CUDA_VISIBLE_DEVICES"]))

batch_size = 16
inputs = torch.randn(batch_size, 3)            # 创建16个数据
labels = torch.randn(batch_size, 3)            # 创建16个数据标签
inputs, labels = inputs.to(device), labels.to(device)      # 数据迁移到设备上,返回数据总接口(应该是一个列表/字典,数据片段-GPU对应关系)
net = ASimpleNet()                             # 模型实例化
net = nn.DataParallel(net)                     # 模型分布结构化
net.to(device)                                 # 模型迁移到设备上,返回一个模型总接口(应该是一个列表/字典,子模型-GPU对应关系)
for epoch in range(1):       # 训练次数自行决定
    outputs = net(inputs)    #  数据统一入口;数据怎么分配,模型参数怎么同步,内部机制自行来处理
# 输出:
# CUDA_VISIBLE_DEVICES : 3, 2, 1, 0
# forward batchsize is: 4
# forward batchsize is: 4
# forward batchsize is: 4
# forward batchsize is: 4

注意:有几个GPU,建几个分支(同结构模型),这样就可以分散到各个GPU上。

CUDA_VISIBLE_DEVICES 得知了当前程序可见的 GPU 数量为 4,而创建的 batch size 为 16,输出每个 GPU 上模型 forward 函数内部的 print 内容,验证了每个 GPU 获得的数据量都是 4 个。

  • DataParallel 会自动将数据切分、加载到相应 GPU,将模型复制到相应 GPU,进行正向传播计算梯度并汇总。

提示

  • DataParallel的整个并行训练过程利用python多线程实现

由以上工作过程分析可知,nn.DataParallel 无法避免的问题:

  • 负载不均衡问题。gpu_0所承担的任务明显要重于其他gpu
  • 速度问题。每个iteration都需要复制模型且均从GPU0卡向其他GPU复制,通讯任务重且效率低;python多线程GIL锁导致的线程颠簸(thrashing)问题。
  • 只能单机运行。由于单进程的约束导致。
  • 只能切分batch到多GPU,而无法让一个model分布在多个GPU上。当一个模型过大,设置batchsize=1时其显存占用仍然大于单张显卡显存,此时就无法使用DataParallel类进行训练。

因此官方推荐使用 torch.nn.DistributedDataParallel 替代 nn.DataParallel

多机多卡

一、基本概念

  • Cluster、Job、task概念:三者可以简单的看成是层次关系
  • task相当于每台机器上的一个进程,多个task组成job;
  • job又有两种:ps参数服务、worker计算服务,组成cluster。

二、同步SGD与异步SGD

  • 1、同步更新:各个用于并行计算的电脑,计算完各自的batch 后,求取梯度值,把梯度值统一送到ps服务机器中,由ps服务机器求取梯度平均值,更新ps服务器上的参数。
    • 如下图所示,可以看成有四台电脑,第一台电脑用于存储参数、共享参数、共享计算,可以简单的理解成内存、计算共享专用的区域,也就是ps job;另外三台电脑用于并行计算的,也就是worker task。
    • 这种计算方法存在的缺陷是:每一轮的梯度更新,都要等到A、B、C三台电脑都计算完毕后,才能更新参数,也就是迭代更新速度取决与A、B、C三台中,最慢的那一台电脑,所以采用同步更新的方法,建议A、B、C三台的计算能力都不想。
  • 2、异步更新:ps服务器收到只要收到一台机器的梯度值,就直接进行参数更新,无需等待其它机器。这种迭代方法比较不稳定,收敛曲线震动比较厉害,因为当A机器计算完更新了ps中的参数,可能B机器还是在用上一次迭代的旧版参数值。

多机多卡讲解

【2024-4-18】大模型多机多卡训练经验总结

LLM多机多卡训练教程好少,有些还拿 torch.distributed.launch 来做,殊不知早就改用 torchrun 了。

环境准备: 以2台机器为例

  • 首先, 2台机器要能免密登录,编辑/etc/hosts文件,加入node信息:
# vi /etc/hosts
ip1 node01
ip2 node02

然后, 两个node分别执行以下操作, 生成私钥和公钥:

ssh-keygen -t rsa

然后, 全部回车,采用默认值。再互相拷贝秘钥:

ssh-copy-id root@ip1
ssh-copy-id root@ip2

分别在2台机器上试试互相ssh,如果无密码输入要求直接登录到另一台服务器则说明配置成功。

2台机器环境必须保持一致,包括python版本,训练所需依赖包等。

还需确保安装了pdsh:

apt-get install pdsh

多机训练

使用 torchrun,毕竟单张GPU有80G显存,7B模型单卡完全放得下。

  • 假设node01为master,node02需要有相同的模型权重和代码,可以直接在master用scp拷贝过去。

准备工作完成后, 可以启动训练命令

  • 首先在node01(master)执行如下命令(非完整,仅供参考,使用deepspeed ZeRO-2):
torchrun --nproc_per_node 8 --nnodes 2 --master_addr ${MASTER_ADDR} --master_port 14545 --node_rank 0 train.py \
  --deepspeed ${deepspeed_config_file} \
  ...

参数

  • nproc_per_node表示每个节点的进程数,可以理解为每个节点所需GPU数
  • nnode表示节点数,2台机器就是2个节点数
  • master_add为master的ip
  • node_rank表示当前启动的是第几个节点

在node02执行同样命令,但需将node_rank指定为1,不出意外的话可以成功跑通,即便报错可能也是依赖包版本两台机器不一致导致。很快就会在控制台看到transformers打印的日志,但发现save_total_limit只在master上管用。

TF

代码编写

  • 1、定义集群
  • 比如假设上面的图所示,我们有四台电脑,名字假设为:A、B、C、D,那么集群可以定义如下
#coding=utf-8
#多台机器,每台机器有一个显卡、或者多个显卡,这种训练叫做分布式训练
import  tensorflow as tf
#现在假设我们有A、B、C、D四台机器,首先需要在各台机器上写一份代码,并跑起来,各机器上的代码内容大部分相同
# ,除了开始定义的时候,需要各自指定该台机器的task之外。以机器A为例子,A机器上的代码如下:
cluster=tf.train.ClusterSpec({
    "worker": [
        "A_IP:2222",#格式 IP地址:端口号,第一台机器A的IP地址 ,在代码中需要用这台机器计算的时候,就要定义:/job:worker/task:0
        "B_IP:1234"#第二台机器的IP地址 /job:worker/task:1
        "C_IP:2222"#第三台机器的IP地址 /job:worker/task:2
    ],
    "ps": [
        "D_IP:2222",#第四台机器的IP地址 对应到代码块:/job:ps/task:0
    ]})

然后需要写四分代码,这四分代码文件大部分相同,但是有几行代码是各不相同的。

  • 2、在各台机器上,定义server
    • 比如A机器上的代码server要定义如下:
      server=tf.train.Server(cluster,job_name='worker',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
      
  • 3、在代码中,指定device
    with tf.device('/job:ps/task:0'):#参数定义在机器D上
      w=tf.get_variable('w',(2,2),tf.float32,initializer=tf.constant_initializer(2))
      b=tf.get_variable('b',(2,2),tf.float32,initializer=tf.constant_initializer(5))
    with tf.device('/job:worker/task:0/cpu:0'):#在机器A cpu上运行
      addwb=w+b
    with tf.device('/job:worker/task:1/cpu:0'):#在机器B cpu上运行
      mutwb=w*b
    with tf.device('/job:worker/task:2/cpu:0'):#在机器C cpu上运行
      divwb=w/b
    

在深度学习训练中,一般图的计算,对于每个worker task来说,都是相同的,所以我们会把所有图计算、变量定义等代码,都写到下面这个语句下:

with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:indexi',cluster=cluster))

函数replica_deviec_setter会自动把变量参数定义部分定义到ps服务中(如果ps有多个任务,那么自动分配)。下面举个例子,假设现在有两台机器A、B,A用于计算服务,B用于参数服务,那么代码如下:

#coding=utf-8
#上面是因为worker计算内容各不相同,不过再深度学习中,一般每个worker的计算内容是一样的,
# 以为都是计算神经网络的每个batch 前向传导,所以一般代码是重用的
import  tensorflow as tf
#现在假设我们有A、B台机器,首先需要在各台机器上写一份代码,并跑起来,各机器上的代码内容大部分相同
# ,除了开始定义的时候,需要各自指定该台机器的task之外。以机器A为例子,A机器上的代码如下:
cluster=tf.train.ClusterSpec({
    "worker": [
        "192.168.11.105:1234",#格式 IP地址:端口号,第一台机器A的IP地址 ,在代码中需要用这台机器计算的时候,就要定义:/job:worker/task:0
    ],
    "ps": [
        "192.168.11.130:2223"#第四台机器的IP地址 对应到代码块:/job:ps/task:0
    ]})
  
#不同的机器,下面这一行代码各不相同,server可以根据job_name、task_index两个参数,查找到集群cluster中对应的机器
  
isps=False
if isps:
    server=tf.train.Server(cluster,job_name='ps',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
    server.join()
else:
    server=tf.train.Server(cluster,job_name='worker',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
    with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:0',cluster=cluster)):
        w=tf.get_variable('w',(2,2),tf.float32,initializer=tf.constant_initializer(2))
        b=tf.get_variable('b',(2,2),tf.float32,initializer=tf.constant_initializer(5))
        addwb=w+b
        mutwb=w*b
        divwb=w/b

saver = tf.train.Saver()
summary_op = tf.merge_all_summaries()
init_op = tf.initialize_all_variables()
sv = tf.train.Supervisor(init_op=init_op, summary_op=summary_op, saver=saver)
with sv.managed_session(server.target) as sess:
    while 1:
        print sess.run([addwb,mutwb,divwb])

把该代码在机器A上运行,你会发现,程序会进入等候状态,等候用于ps参数服务的机器启动,才会运行。

因此接着我们在机器B上运行如下代码:

#coding=utf-8
#上面是因为worker计算内容各不相同,不过再深度学习中,一般每个worker的计算内容是一样的,
# 以为都是计算神经网络的每个batch 前向传导,所以一般代码是重用的
#coding=utf-8
#多台机器,每台机器有一个显卡、或者多个显卡,这种训练叫做分布式训练
import  tensorflow as tf
#现在假设我们有A、B、C、D四台机器,首先需要在各台机器上写一份代码,并跑起来,各机器上的代码内容大部分相同
# ,除了开始定义的时候,需要各自指定该台机器的task之外。以机器A为例子,A机器上的代码如下:
cluster=tf.train.ClusterSpec({
    "worker": [
        "192.168.11.105:1234",#格式 IP地址:端口号,第一台机器A的IP地址 ,在代码中需要用这台机器计算的时候,就要定义:/job:worker/task:0
    ],
    "ps": [
        "192.168.11.130:2223"#第四台机器的IP地址 对应到代码块:/job:ps/task:0
    ]})
  
#不同的机器,下面这一行代码各不相同,server可以根据job_name、task_index两个参数,查找到集群cluster中对应的机器
  
isps=True
if isps:
    server=tf.train.Server(cluster,job_name='ps',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
    server.join()
else:
    server=tf.train.Server(cluster,job_name='worker',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
    with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:0',cluster=cluster)):
        w=tf.get_variable('w',(2,2),tf.float32,initializer=tf.constant_initializer(2))
        b=tf.get_variable('b',(2,2),tf.float32,initializer=tf.constant_initializer(5))
        addwb=w+b
        mutwb=w*b
        divwb=w/b
  
saver = tf.train.Saver()
summary_op = tf.merge_all_summaries()
init_op = tf.initialize_all_variables()
sv = tf.train.Supervisor(init_op=init_op, summary_op=summary_op, saver=saver)
with sv.managed_session(server.target) as sess:
    while 1:
        print sess.run([addwb,mutwb,divwb])

分布式训练需要熟悉的函数:

  • tf.train.Server
  • tf.train.Supervisor
  • tf.train.SessionManager
  • tf.train.ClusterSpec
  • tf.train.replica_device_setter
  • tf.train.MonitoredTrainingSession
  • tf.train.MonitoredSession
  • tf.train.SingularMonitoredSession
  • tf.train.Scaffold
  • tf.train.SessionCreator
  • tf.train.ChiefSessionCreator
  • tf.train.WorkerSessionCreator

PyTorch

DP

  • DP就是 DataParallel。DP 是单进程控制多 GPU。
    • DP 将输入的一个 batch 数据分成了 n 份(n 为实际使用的 GPU 数量),分别送到对应的 GPU 进行计算。
    • 在网络前向传播时,模型会从主 GPU 复制到其它 GPU 上;
    • 在反向传播时,每个 GPU 上的梯度汇总到主 GPU 上,求得梯度均值更新模型参数后,再复制到其它 GPU,以此来实现并行。
    • 由于主 GPU 要进行梯度汇总和模型更新,并将计算任务下发给其它 GPU,所以主 GPU 的负载与使用率会比其它 GPU 高,这就导致了 GPU 负载不均衡的现象。

DDP

  • DDP 是 DistributedDataParallel。DDP 多进程控制多 GPU。
    • 系统会为每个 GPU 创建一个进程,不再有主 GPU,每个 GPU 执行相同的任务。
    • DDP 使用分布式数据采样器(DistributedSampler)加载数据,确保数据在各个进程之间没有重叠。
    • 在反向传播时,各 GPU 梯度计算完成后,各进程以广播的方式将梯度进行汇总平均,然后每个进程在各自的 GPU 上进行梯度更新,从而确保每个 GPU 上的模型参数始终保持一致。由于无需在不同 GPU 之间复制模型,DDP 的传输数据量更少,因此速度更快。

DDP 既可用于单机多卡也可用于多机多卡,它能解决 DataParallel 速度慢、GPU 负载不均衡等问题。因此,官方更推荐使用 DistributedDataParallel 来进行分布式训练

基本概念

  • group:进程组。默认情况下,只有一个组,即一个 world。(DDP 多进程控制多 GPU)
  • world_size :表示全局进程个数。
  • rank:表示进程序号,用于进程间通讯,表示进程优先级。rank=0 的主机为主节点。

训练基本流程

  • (1)初始化进程组:用 init_process_group 函数
    • backend:是通信所用的后端,可以是“nccl”或“gloo”。一般来说,nccl 用于 GPU 分布式训练,gloo 用于 CPU 进行分布式训练。
    • init_method:字符串类型,是一个 url,进程初始化方式,默认是 “env://”,表示从环境变量初始化,还可以使用 TCP 的方式或共享文件系统 。
    • world_size:执行训练的所有的进程数,表示一共有多少个节点(机器)。
    • rank:进程的编号,也是其优先级,表示当前节点(机器)的编号。group_name:进程组的名字。
  • (2)模型并行化:用 DistributedDataParallel,将模型分发至多 GPU 上
    • DistributedDataParallel 的参数与 DataParallel 基本相同
  • (3)创建分布式数据采样器

DP 是直接将一个 batch 的数据划分到不同的卡,但是多机多卡间频繁数据传输会严重影响效率,这时就要用到分布式数据采样器 DistributedSampler,它会为每个子进程划分出一部分数据集,从而使 DataLoader 只会加载特定的一个子数据集,以避免不同进程之间有数据重复。

  • 先将 train_dataset 送到了 DistributedSampler 中,并创建了一个分布式数据采样器 train_sampler。
  • 再构造 DataLoader ,, 参数中传入了一个 sampler=train_sampler,即可让不同的进程节点加载属于自己的那份子数据集。也就是说,使用 DDP 时,不再是从主 GPU 分发数据到其他 GPU 上,而是各 GPU 从自己的硬盘上读取属于自己的那份数据。

具体逻辑:

  • 加载模型阶段。每个GPU都拥有模型的一个副本,所以不需要拷贝模型。rank为0的进程会将网络初始化参数broadcast到其它每个进程中,确保每个进程中的模型都拥有一样的初始化值。
  • 加载数据阶段。DDP 不需要广播数据,而是使用多进程并行加载数据。在 host 之上,每个worker进程都会把自己负责的数据从硬盘加载到 page-locked memory。DistributedSampler 保证每个进程加载到的数据是彼此不重叠的。
  • 前向传播阶段。在每个GPU之上运行前向传播,计算输出。每个GPU都执行同样的训练,所以不需要有主 GPU。
  • 计算损失。在每个GPU之上计算损失。
  • 反向传播阶段。运行后向传播来计算梯度,在计算梯度同时也对梯度执行all-reduce操作。
  • 更新模型参数阶段。因为每个GPU都从完全相同的模型开始训练,并且梯度被all-reduced,因此每个GPU在反向传播结束时最终得到平均梯度的相同副本,所有GPU上的权重更新都相同,也就不需要模型同步了。注意,在每次迭代中,模型中的Buffers 需要从rank为0的进程广播到进程组的其它进程上。

代码略,见原文

注意

  • 使用 DDP 意味着使用多进程,如果直接保存模型,每个进程都会执行一次保存操作,此时只使用主进程中的一个 GPU 来保存即可。

Pytorch 分布式训练

分布式基础

分布式模式

PyTorch 原生支持的并行模式:

  • 完全分片数据并行(full sharded data parallel,FSDP
  • 混合分片数据并行(hybrid sharding data parallel,HSDP
  • 张量并行(tensor parallel,TP
  • 流水线并行(pipeline parallel,PP
  • 序列并行(sequence parallel,SP
  • 上下文并行(context parallel,CP

【2023-3-2】PyTorch 分布式训练实现(DP/DDP/torchrun/多机多卡)

相对 Tensorflow,Pytorch 简单的多。分布式训练主要有两个API:

  • DataParallel(DP): PS模式,1张卡为reduce(parame server),实现就1行代码
    • 单进程多线程,仅仅能工作在单机中
    • 将数据分割到多个GPU上。典型的数据并行,将模型复制到每个GPU上,一旦GPU0计算出梯度,就同步梯度到各个节点,这需要大量的GPU数据传输(类似PS模式)
  • DistributedDataParallel(DDP): All-Reduce模式,单机多卡/多级多卡皆可。官方建议API
    • 多进程,单机或多机
    • 每个GPU进程中创建模型副本,并只让数据的一部分对改GPU可用。因为每个GPU中的模型是独立运行的,所以在所有的模型都计算出梯度后,才会在模型之间同步梯度(类似All-reduce)

分析

  • DDP每个batch只需要一次数据传输;
  • DP可能存在多次数据同步(不用worker之间可能快慢不一样)。
  • DataParallel 通常慢于 DistributedDataParallel

【2024-7-24】PyTorch 为数据分布式训练提供了多种选择。

随着应用从简单到复杂,从原型到产品,常见的开发轨迹可以是:

  1. 数据和模型能放入单个GPU,单设备训练,此时不用担心训练速度;
  2. 服务器上有多个GPU,且代码修改量最小,加速训练用单个机器多GPU DataParallel
  3. 进一步加速训练,且愿意写点代码,用单个机器多个GPU DistributedDataParallel
  4. 应用程序跨机器边界扩展,用多机器DistributedDataParallel启动脚本
  5. 预期有错误(比如OOM)或资源可动态连接和分离,使用torchelastic来启动分布式训练。

分布式训练的场景很多,单机多卡,多机多卡,模型并行,数据并行等等。接下来就以常见的单机多卡的情况进行记录。

PyTorch 使用 DDP(Distributed Data Parallel) 实现了真正的分布式数据并行,两个场景下都可使用 DDP 实现模型的分布式训练:

  • (1) 单机、多 GPU(单进程多线程的分布式)
  • (2) 多机、多 GPU(多机多进程的真正分布式)

方法(1)类似简单 DP 数据并行模式

  • DP 使用单进程、多线程范式来实现;
  • 而 DDP 完全使用多进程方式,包括单机多进程、多机多进程

即使单机、多 GPU,也建议使用 DDP 模式,实现基于数据并行的模型训练,使用单机 DDP 模式训练模型的性能要比 DP 模式好很多。

DDP 基于集合通信(Collective Communications)实现分布式训练过程中的梯度同步。

反向传播过程中,DDP 使用 AllReduce 来实现分布式梯度计算和同步。

1、DataParallel

模型与变量必须在同一个设备上(CPU or GPU)

pytorch 使用to函数实现变量或模型的存储转移

  • to函数的对象: 数据Tensor,或 模型Module
  • 张量不执行inplace(即 执行之后重新构建一个新的张量)
  • 模型执行inplace(执行之后不重新构建一个新的模型)

原理:

  • 当给定model时,主要实现功能是将input数据依据batch的这个维度,将数据划分到指定的设备上。其他的对象(objects)复制到每个设备上。在前向传播的过程中,module被复制到每个设备上,每个复制的副本处理一部分输入数据。
  • 在反向传播过程中,每个副本module的梯度被汇聚到原始的module上计算(一般为第0块GPU)。

举例:

  • 如果当前有4个GPU,batch_size=16,那么模型将被复制到每一个GPU上,在前向传播时,每一个gpu将分到4个batch,每个gpu独立计算依据分到的batch计算出结果的梯度,然后将梯度返回到第一个GPU上,第一个GPU再进行梯度融合、模型更新。在下一次前向传播的时候,将更新后的模型再复制给每一个GPU。
###	第一步:构建模型
# module 需要分发的模型
# device_ids 可分发的gpu,默认分发到所有看见GPU(环境变量设置的)
# output_device 结果输出设备 通常设置成逻辑gpu的第一个
module = your_simple_net() #你的模型
Your_Parallel_Net = torch.nn.DataParallel(module, device_ids=None, output_device=None)
### 第二步:数据迁移
inputs=inputs.to(device)	
labels=labels.to(device)	
# device通常应为模型输出的output_device,否则无法计算loss

代码

import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os

input_size = 5
output_size = 2
batch_size = 30
data_size = 30

class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size)

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return self.len

rand_loader = DataLoader(dataset=RandomDataset(input_size, data_size), batch_size=batch_size, shuffle=True)

class Model(nn.Module):
    # Our model
    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        print("  In Model: input size", input.size(),
              "output size", output.size())
        return output
model = Model(input_size, output_size)

if torch.cuda.is_available():
    model.cuda()

if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    # 就这一行!将模型整体复制到每个GPU上,计算完成后各自汇总到ps节点
    model = nn.DataParallel(model)

for data in rand_loader:
    if torch.cuda.is_available():
        input_var = Variable(data.cuda())
    else:
        input_var = Variable(data)
    output = model(input_var)
    print("Outside: input size", input_var.size(), "output_size", output.size())

2、DDP(官方建议)

DP 问题

为什么要引入DDP(DistributedDataParallel)?DP 存在问题

  • 1、DP 每个训练批次(batch)中,一个进程上先算出模型权重, 然后再分发到每个GPU上
    • 网络通信就成为了瓶颈,而GPU使用率也通常很低。
    • 显存浪费, 多存储了 n-1 份 模型副本
  • 2、每次前向传播时把模型也复制了(即每次更新都复制一遍模型),并且单进程多线程会造成GIL contention (全局解释器锁争用) 这里进程计算权重使通信成为瓶颈造成了大量的时间浪费,因此引入了DDP。

dp 两个问题:

  • 1️⃣ 显存浪费严重。
    • 以单机八卡为例,把模型复制8份放在8张卡上同时推理,因此多付出了7个模型(副本)的显存开销;
  • 2️⃣ 大模型不适用。
    • 以最新提出的Llama 3.1为例,不经量化(FP16数据类型)的情况下,容纳70B的模型需要140GB的显存,即使是40G一张的A100也无法承受。
    • 而这才仅仅是容纳模型,还没有考虑存放数据,以及训练的话存放梯度数据等。因此数据并行并不适用于70B级别大模型的推理和训练。

DDP采用多进程控制多GPU,共同训练模型,一份代码会被pytorch自动分配到n个进程并在n个GPU上运行。

  • DDP运用 Ring-Reduce通信算法在每个GPU间对梯度进行通讯,交换彼此的梯度,从而获得所有GPU的梯度。

对比DP,不需要在进行模型本体的通信,因此可以加速训练。

torch.nn.DataParallel

  • DataParallel 全程维护一个 optimizer,对各 GPU 上梯度进行求和,而在主 GPU 进行参数更新,之后再将模型参数 broadcast 到其他 GPU

注意:

  • 1、设置 DistributedSampler 来打乱数据,因为一个batch被分配到了好几个进程中,要确保不同的GPU拿到的不是同一份数据。
  • 2、要告诉每个进程自己的id,即使用哪一块GPU。
  • 3、如果需要做BatchNormalization,需要对数据进行同步(还待研究,挖坑)

DDP采用All-Reduce架构,单机多卡、多机多卡都能用。

注意:DDP并不会自动shard数据

  1. 如果自己写数据流,得根据torch.distributed.get_rank()去shard数据,获取自己应用的一份
  2. 如果用 Dataset API,则需要在定义Dataloader的时候用 DistributedSampler 去shard

torch.distributed 介绍

torch.nn.DataParallel 支持数据并行,但不支持多机分布式训练,且底层实现相较于 distributed 的接口,有些许不足。

Pytorch 通过 torch.distributed 包提供分布式支持,包括 GPU 和 CPU 的分布式训练支持。

  • Pytorch 分布式目前只支持 Linux。

torch.distributed 优势:

  • 每个进程对应一个独立的训练过程,且只对梯度等少量数据进行信息交换。
    • 迭代中,每个进程具有自己的 optimizer ,独立完成所有优化步骤,进程内与一般的训练无异。
    • 各进程梯度计算完成之后,先将梯度进行汇总平均,再由 rank=0 的进程,将其 broadcast 到所有进程。最后,各进程用该梯度来更新参数。
    • 各进程的模型参数始终保持一致: 各进程初始参数、更新参数都一致
    • 相比 DataParallel, torch.distributed 传输的数据量更少,因此速度更快,效率更高
  • 每个进程包含独立的解释器和 GIL
    • 每个进程拥有独立的解释器和 GIL,消除了单个 Python 进程中的多个执行线程,模型副本或 GPU 的额外解释器开销和 GIL-thrashing ,因此可以减少解释器和 GIL 使用冲突

torch.distributed 概念

【2024-4-7】Pytorch 分布式训练

概念:

  • group:即进程组。默认只有1个组,1个 job 即为1个组,即 1个 world。
    • 当需要进行更加精细的通信时,通过 new_group 接口,使用 word 的子集,创建新组,用于集体通信等。
  • world_size :表示全局进程数。一个进程可对应多个GPU
    • world_size ≠ GPU数: 1个进程用多个GPU
    • world_size = GPU数: 1个进程用1个GPU
  • local_word_size: 某个节点上进程数 (相对比较少见)
  • rank:全局进程id, 表示进程序号,用于进程间通讯,表征进程优先级。取值范围: 0~world_size
    • rank = 0 主机为 master 节点
  • local_rank:某个节点上进程id, 进程内GPU 编号,非显式参数,由 torch.distributed.launch 内部指定。
    • rank = 3local_rank = 0 表示第 3 个进程内的第 1 块 GPU。
  • global_rank: 全局 gpu编号

如果 所有进程数(world_size)为W,每个节点上的进程数(local_world_size)为L, 则每个进程上的两个ID:

  • rank 取值范围:[0, W-1]
    • rank=0 进程为主进程,负责同步分发工作
    • rank>0 进程为从进程
    • rank=-1, 默认值,非GPU进程?
  • local_rank 取值:[0, L-1]

2机8卡的分布式训练示例

  • gpu 编号: 0~3
  • local rank: gpu 本地编号, 0~3
  • global rank: gpu 全局编号, 0~7

Pytorch 分布式基本流程:

  • 使用 distributed 包任何函数前,用 init_process_group 初始化进程组,同时初始化 distributed 包。
  • 如进行小组内集体通信,用 new_group 创建子分组
  • 创建分布式并行模型 DDP(model, device_ids=device_ids)
  • 为数据集创建 Sampler
  • 使用启动工具 torch.distributed.launch 在每个主机上执行一次脚本,开始训练
  • 使用 destory_process_group() 销毁进程组

torch.distributed 提供了 3 种初始化方式:tcp共享文件环境变量初始化 等。

  • TCP: 指定进程 0 的 ip 和 port, 手动为每个进程指定进程号。
  • 共享文件: 共享文件对于组内所有进程可见
  • 环境变量:

测试代码

import torch.distributed as dist
import argparse, os

parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=ine, default=0)
args = parser.parse_args()

# 分布式初始化, 读取环境变量 RANK=1 WORLD_SIZE=3 MASTER_ADDR=127.0.0.1 MASTER_PORT=8000
dist.init_process_group("nccl") # 进程组初始化
rank = dist.get_rank()
local_rank_arg = args.local_rank               # 命令行形式ARGS形式
local_rank_env = int(os.environ['LOCAL_RANK']) # 用env初始ENV环境变量形式
local_world_size = int(os.environ['LOCAL_WORLD_SIZE'])
# local_rank_env = int(os.environ.get('LOCAL_RANK', 0)) # 在利用env初始ENV环境变量形式
# local_world_size = int(os.environ.get('LOCAL_WORLD_SIZE', 3))

print(f"{rank=}; {local_rank_arg=}; {local_rank_env=}; {local_world_size=}")

执行

python3 -m torch.distributed.launch --nproc_per_node=4 test.py 

在一台4卡机器上执行, 样例输出:

# WARNING:torch.distributed.run:
# *****************************************
# Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
# *****************************************
rank=2; local_rank_arg=2; local_rank_env=2, local_world_size=4
rank=0; local_rank_arg=0; local_rank_env=0, local_world_size=4
rank=3; local_rank_arg=3; local_rank_env=3, local_world_size=4
rank=1; local_rank_arg=1; local_rank_env=1, local_world_size=4

一般分布式训练都是为每个进程赋予一块GPU,这样比较简单而且容易调试。 这种情况下,可以通过 local_rank 作为当前进程GPU的id。

数据读取

pytorch 分布式训练,数据读取采用主进程预读取并缓存,其它进程从缓存中读取,不同进程之间的数据同步具体通过torch.distributed.barrier()实现。参考

  • 分布式数据读取: 主进程读取数据 → 主进程缓存 → 从进程读取缓存

进程号rank

多进程上下文中,通常假定rank 0是第一个进程/主进程,其它进程分别具有 0,1,2 不同rank号,这样总共具有4个进程。

(2)单一进程数据处理

通有些操作没必要并行处理,如: 数据读取与处理操作,只需要一个进程进行处理并缓存,然后与其它进程共享缓存处理数据

  • 但由于不同进程同步执行,单一进程处理数据必然会导致进程间不同步现象(数据读取操作处理时间较长)对于较短时间的单一进程程序运行不会影响线程不同步的情况

为此,torch中采用了barrier()函数对其它非主进程进行阻塞,达到同步目的

(3)barrier()具体原理

如果执行 create_dataloader()函数的进程

  • 不是主进程: 即rank不等于0或者-1
    • 上下文管理器会执行相应的 torch.distributed.barrier(),设置一个阻塞栅栏,让此进程处于等待状态,等待所有进程到达栅栏处(包括主进程数据处理完毕);
  • 是主进程: 其会直接读取数据,然后处理结束之后会遇到 torch.distributed.barrier()

此时,所有进程都到达了当前的栅栏处,这样所有进程就达到了同步,并同时得到释放。

def create_dataloader():
    #使用上下文管理器中实现的barrier函数确保分布式中的主进程首先处理数据,然后其它进程直接从缓存中读取
    with torch_distributed_zero_first(rank):
        dataset = LoadImagesAndLabels()
 
from contextlib import contextmanager
 
#定义的用于同步不同进程对数据读取的上下文管理器
@contextmanager
def torch_distributed_zero_first(local_rank: int):
    """
    Decorator to make all processes in distributed training wait for each local_master to do something.
    """
    if local_rank not in [-1, 0]:
        torch.distributed.barrier()
    yield   #中断后执行上下文代码,然后返回到此处继续往下执行
    if local_rank == 0:
        torch.distributed.barrier()

初始化进程组 init_process_group

init_process_group 函数原型

torch.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(0, 1800), 
                                     world_size=-1, rank=-1, store=None)

函数作用

  • 每个进程中进行调用,用于初始化该进程。
  • 使用分布式时,该函数必须在 distributed 内所有相关函数之前使用。

参数详解

  • backend :指定当前进程要使用的通信后端
    • 小写字符串,支持的通信后端有 gloo, mpi, nccl, 建议用 nccl
    • cpu 分布式选 gloo
    • gpu 分布式选 nccl
  • init_method :当前进程组初始化方式
    • 可选参数,字符串形式。两种方式: init_method + store, init_methodstore的高层封装, 二者互斥
    • init_method: TCP连接、File共享文件系统、ENV环境变量三种方式
    • store: 同时指定world_size 和 rank参数。store 是一种分布式中核心的key-value存储,用于不同进程间共享信息
    • 如果未指定, 默认为 env,表示使用读取环境变量方式初始化。该参数与 store 互斥。
  • rank :指定当前进程的优先级
  • int 值。表示当前进程的编号,即优先级。如果指定 store 参数,则必须指定该参数。
    • rank=0 的为主进程,即 master 节点。
  • world_size :该 job 中的总进程数。如果指定 store 参数,则需要指定该参数。
  • timeout : 指定每个进程的超时时间
    • 可选参数,datetime.timedelta 对象,默认为 30 分钟。该参数仅用于 Gloo 后端。
  • store
    • 所有 worker 可访问的 key / value,用于交换连接 / 地址信息。与 init_method 互斥。

三种init_method:

  • init_method=’tcp://ip:port‘: 通过指定rank 0(MASTER进程)的IP和端口,各个进程tcp进行信息交换。需指定 rank 和 world_size 这两个参数。
  • init_method=’file://path‘:通过所有进程都可以访问共享文件系统来进行信息共享。需要指定rank和world_size参数。
  • init_method=env://:从环境变量中读取分布式信息(os.environ),主要包括 MASTER_ADDR, MASTER_PORT, RANK, WORLD_SIZE。 其中,rank和world_size可手动指定,否则从环境变量读取。

tcp 和 env 两种方式比较类似, 其实 env就是对tcp 一层封装),都是通过网络地址方式进行通信,最常用的初始化方法。

import os, argparse
import torch
import torch.distributed as dist

parse = argparse.ArgumentParser()
parse.add_argument('--init_method', type=str)
parse.add_argument('--rank', type=int)
parse.add_argument('--ws', type=int)
args = parse.parse_args()

if args.init_method == 'TCP':
	dist.init_process_group('nccl', init_method='tcp://127.0.0.1:28765', rank=args.rank, world_size=args.ws)
elif args.init_method == 'ENV':
    dist.init_process_group('nccl', init_method='env://')

rank = dist.get_rank()
print(f"rank = {rank} is initialized")
# 单机多卡情况下,localrank = rank. 严谨应该是local_rank来设置device
torch.cuda.set_device(rank)
tensor = torch.tensor([1, 2, 3, 4]).cuda()
print(tensor)

单机双卡机器上,开两个终端,同时运行命令

# TCP方法
python3 test_ddp.py --init_method=TCP --rank=0 --ws=2
python3 test_ddp.py --init_method=TCP --rank=1 --ws=2
# ENV方法
MASTER_ADDR='localhost' MASTER_PORT=28765 RANK=0 WORLD_SIZE=2 python3 test_gpu.py --init_method=ENV
MASTER_ADDR='localhost' MASTER_PORT=28765 RANK=1 WORLD_SIZE=2 python3 test_gpu.py --init_method=ENV

如果开启的进程未达到 word_size 的数量,则所有进程会一直等待,直到都开始运行,可以得到输出如下:

# rank0 的终端:
rank 0 is initialized
tensor([1, 2, 3, 4], device='cuda:0')
# rank1的终端
rank 1 is initialized
tensor([1, 2, 3, 4], device='cuda:1')

说明

  • 初始化DDP时,给后端提供主进程的地址端口、本身RANK,以及进程数量即可。
  • 初始化完成后,可以执行很多分布式的函数,比如 dist.get_rank, dist.all_gather 等等。

new_group

函数声明

torch.distributed.new_group(ranks=None, timeout=datetime.timedelta(0, 1800), backend=None)

函数作用

  • new_group() 函数可用于使用所有进程的任意子集来创建新组。其返回一个分组句柄,可作为 collectives 相关函数的 group 参数 。collectives 是分布式函数,用于特定编程模式中的信息交换。

参数详解

  • ranks:指定新分组内的成员的 ranks 列表list ,其中每个元素为 int 型
  • timeout:指定该分组进程组内的操作的超时时间
    • 可选参数,datetime.timedelta 对象,默认为 30 分钟。该参数仅用于 Gloo 后端。
  • backend:指定要使用的通信后端
    • 小写字符串,支持的通信后端有 gloo,nccl ,必须与 init_process_group() 中一致。

其它函数

  • get_backend 获取进程组属性
  • get_rank 获取分布式进程组组内的每个进程的唯一识别
  • get_world_size 获取进程组内的进程数
  • is_initialized 检查默认进程组是否被初始化
  • is_mpi_available 检查 MPI 后端是否可用
  • is_nccl_available 检查 NCCL 后端是否可用
(1) TCP 初始化
import torch.distributed as dist

# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',rank=args.rank, world_size=4)

说明

  • 不同进程内,均使用主进程的 ip 地址和 port,确保每个进程能够通过一个 master 进行协作。该 ip 一般为主进程所在的主机的 ip,端口号应该未被其他应用占用。
  • 实际使用时,在每个进程内运行代码,并需要为每一个进程手动指定一个 rank,进程可以分布与相同或不同主机上。
  • 多个进程之间,同步进行。若其中一个出现问题,其他的也马上停止。

使用

# Node 1
python mnsit.py --init-method tcp://192.168.54.179:22225 --rank 0 --world-size 2
# Node 2
python mnsit.py --init-method tcp://192.168.54.179:22225 --rank 1 --world-size 2

初始化示例

  • tcp_init.py
import torch.distributed as dist
import torch.utils.data.distributed
# ......
parser = argparse.ArgumentParser(description='PyTorch distributed training on cifar-10')
parser.add_argument('--rank', default=0, help='rank of current process')
parser.add_argument('--word_size', default=2,help="word size")
parser.add_argument('--init_method', default='tcp://127.0.0.1:23456', help="init-method")
args = parser.parse_args()
# ......
dist.init_process_group(backend='nccl', init_method=args.init_method, rank=args.rank, world_size=args.word_size)
# ......
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=download, transform=transform)
train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, sampler=train_sampler)
# ......
net = Net()
net = net.cuda()
net = torch.nn.parallel.DistributedDataParallel(net)

执行方式

  • init_method
# Node 1 : ip 192.168.1.201  port : 12345
python tcp_init.py --init_method tcp://192.168.1.201:12345 --rank 0 --word_size 3
# Node 2 : 
python tcp_init.py --init_method tcp://192.168.1.201:12345 --rank 1 --word_size 3
# Node 3 : 
python tcp_init.py --init_method tcp://192.168.1.201:12345 --rank 2 --word_size 3

说明

  • TCP 方式中,init_process_group 中必须手动指定以下参数
    • rank 为当前进程的进程号
    • word_size 为当前 job 总进程数
    • init_method 内指定 tcp 模式,且所有进程的 ip:port 必须一致,设定为主进程的 ip:port
  • 必须在 rank==0 的进程内保存参数。
  • 若程序内未根据 rank 设定当前进程使用的 GPUs,则默认使用全部 GPU,且以数据并行方式使用。
  • 每条命令表示一个进程。若已开启的进程未达到 word_size 的数量,则所有进程会一直等待
  • 每台主机上可以开启多个进程。但是,若未为每个进程分配合适的 GPU,则同机不同进程可能会共用 GPU,应该坚决避免这种情况。
  • 使用 gloo 后端进行 GPU 训练时,会报错。
  • 若每个进程负责多块 GPU,可以利用多 GPU 进行模型并行。
class ToyMpModel(nn.Module):
    def init(self, dev0, dev1):
        super(ToyMpModel, self).init()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(10, 10).to(dev0)
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to(dev1)

def forward(self, x):
       x = x.to(self.dev0)
       x = self.relu(self.net1(x))
       x = x.to(self.dev1)
       return self.net2(x)
# ......
dev0 = rank * 2
dev1 = rank * 2 + 1
mp_model = ToyMpModel(dev0, dev1)
ddp_mp_model = DDP(mp_model)
# ......
(2) 共享文件初始化

共享的文件对于组内所有进程可见

设置方式如下:

import torch.distributed as dist

# rank should always be specified
dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile',
                        world_size=4, rank=args.rank)

说明

  • file://前缀表示文件系统各式初始化。
  • /mnt/nfs/sharedfile 表示共享文件,各个进程在共享文件系统中通过该文件进行同步或异步。

因此,所有进程必须对该文件具有读写权限。

  • 每一个进程将会打开这个文件,写入自己的信息,并等待直到其他所有进程完成该操作
  • 在此之后,所有的请求信息将会被所有的进程可访问,为了避免 race conditions,文件系统必须支持通过 fcntl 锁定(大多数的 local 系统和 NFS 均支持该特性)。

说明:

  • 若指定为同一文件,则每次训练开始之前,该文件必须手动删除,但是文件所在路径必须存在!

与 tcp 初始化方式一样,也需要为每一个进程手动指定 rank。

使用

# 主机 01 上:
python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 0 --world-size 2
# 主机 02 上:
python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 1 --world-size 2

相比于 TCP 方式, 麻烦一点的是运行完一次必须更换共享的文件名,或者删除之前的共享文件,不然第二次运行会报错。

(3) Env 初始化(默认)

默认情况下都是环境变量来进行分布式通信,指定 init_method="env://"

通过在所有机器上设置如下四个环境变量,所有进程将会适当的连接到 master,获取其他进程的信息,并最终与它们握手(信号)。

  • MASTER_PORT: 必须指定,表示 rank0上机器的一个空闲端口(必须设置)
  • MASTER_ADDR: 必须指定,除了 rank0 主机,表示主进程 rank0 机器的地址(必须设置)
  • WORLD_SIZE: 可选,总进程数,可以这里指定,在 init 函数中也可以指定
  • RANK: 可选,当前进程的 rank,也可以在 init 函数中指定

配合 torch.distribution.launch 使用。

实例

# Node 1: (IP: 192.168.1.1, and has a free port: 1234)
python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node_rank=0 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
# Node 2
python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node_rank=1 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)

代码 env_init.py

import torch.distributed as dist
import torch.utils.data.distributed

# ......
import argparse
parser = argparse.ArgumentParser()
# 注意这个参数,必须要以这种形式指定,即使代码中不使用。因为 launch 工具默认传递该参数
parser.add_argument("--local_rank", type=int)
args = parser.parse_args()

# ......
dist.init_process_group(backend='nccl', init_method='env://')

# ......
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=download, transform=transform)
train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, sampler=train_sampler)

# ......
# 根据 local_rank,配置当前进程使用的 GPU
net = Net()
device = torch.device('cuda', args.local_rank)
net = net.to(device)
net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[args.local_rank], output_device=args.local_rank)

执行方式

# 节点0
python -m torch.distributed.launch --nproc_per_node=2 --nnodes=3 --node_rank=0 --master_addr="192.168.1.201" --master_port=23456 env_init.py
# 节点1
python -m torch.distributed.launch --nproc_per_node=2 --nnodes=3 --node_rank=1 --master_addr="192.168.1.201" --master_port=23456 env_init.py
# 节点2
python -m torch.distributed.launch --nproc_per_node=2 --nnodes=3 --node_rank=2 --master_addr="192.168.1.201" --master_port=23456 env_init.py

说明

  • Env 方式中,init_process_group 无需指定任何参数
  • 必须在 rank==0 的进程内保存参数。

该方式使用 torch.distributed.launch 在每台主机上创建多进程,其中:

  • nproc_per_node 参数指定为当前主机创建的进程数。一般设定为当前主机的 GPU 数量
  • nnodes 参数指定当前 job 包含多少个节点
  • node_rank 指定当前节点的优先级
  • master_addrmaster_port 分别指定 master 节点的 ip:port
  • 若没有为每个进程合理分配 GPU,则默认使用当前主机上所有的 GPU。即使一台主机上有多个进程,也会共用 GPU。
  • 使用 torch.distributed.launch 工具时,为当前主机创建 nproc_per_node 个进程,每个进程独立执行训练脚本。同时,它还会为每个进程分配一个 local_rank 参数,表示当前进程在当前主机上的编号。
    • 例如:rank=2, local_rank=0 表示第 3 个节点上的第 1 个进程。
  • 需要合理利用 local_rank 参数,来合理分配本地的 GPU 资源
  • 每条命令表示一个进程。若已开启的进程未达到 word_size 数量,则所有进程会一直等待

详见: Pytorch 分布式训练

GPU 启动方式

常见的GPU 启动方式

  • torch.multiprocessing: 容易控制, 更加灵活
  • torch.distributed.launch: 代码量少, 启动速度快
  • torchrun: distributed.launch 的进化版, 代码量更少
  • Slurm Workload Manager: slurm 启动近期更新掉

DDP 本身是一个 python 多进程,完全可以直接通过多进程方式来启动分布式程序。

torch 提供两种启动工具运行torch DDP程序。

  • torch.multiprocessing
  • launch/run
(1) mp.spawn

用 torch.multiprocessing(python multiprocessing的封装类) 自动生成多个进程

基本的调用函数 spawn:

mp.spawn(fn, args=(), nprocs=1, join=True, daemon=False)

其中:

  • fn: 进程入口函数,第一个参数会被默认自动加入当前进程的rank, 即实际调用: fn(rank, *args)
  • nprocs: 进程数量,即:world_size
  • args: 函数fn的其他常规参数以tuple形式传递

示例

import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def fn(rank, ws, nums):
    dist.init_process_group('nccl', init_method='tcp://127.0.0.1:28765', rank=rank, world_size=ws)
    rank = dist.get_rank()
    print(f"rank = {rank} is initialized")
    torch.cuda.set_device(rank)
    tensor = torch.tensor(nums).cuda()
    print(tensor)

if __name__ == "__main__":
    ws = 2
    mp.spawn(fn, nprocs=ws, args=(ws, [1, 2, 3, 4]))

命令

python3 test_ddp.py

输出如下:

rank = 0 is initialized
rank = 1 is initialized
tensor([1, 2, 3, 4], device='cuda:1')
tensor([1, 2, 3, 4], device='cuda:0')

这种方式同时适用于 TCP 和 ENV 初始化。

(2) launch/run

torch 提供的 torch.distributed.launch 工具,以模块形式直接执行:

python3 -m torch.distributed.launch --配置 train.py --args参数

常用配置有:

  • nnodes: 使用的机器数量,单机的话,就默认是1了
  • nproc_per_node: 单机的进程数,即单机的worldsize
  • master_addr/port: 使用的主进程rank0的地址和端口
  • node_rank: 当前的进程rank

单机情况下

  • 只有 –nproc_per_node 是必须指定
  • master_addr/portnode_rank 都是可以由launch通过环境自动配置
mport torch
import torch.distributed as dist
import torch.multiprocessing as mp
import os

dist.init_process_group('nccl', init_method='env://')

rank = dist.get_rank()
local_rank = os.environ['LOCAL_RANK']
master_addr = os.environ['MASTER_ADDR']
master_port = os.environ['MASTER_PORT']
print(f"rank = {rank} is initialized in {master_addr}:{master_port}; local_rank = {local_rank}")
torch.cuda.set_device(rank)
tensor = torch.tensor([1, 2, 3, 4]).cuda()
print(tensor)

启动命令

python3 -m torch.distribued.launch --nproc_per_node=2 test_ddp.py

输出如下:

rank = 0 is initialized in 127.0.0.1:29500; local_rank = 0
rank = 1 is initialized in 127.0.0.1:29500; local_rank = 1
tensor([1, 2, 3, 4], device='cuda:1')
tensor([1, 2, 3, 4], device='cuda:0')
(3) torchrun

torch 1.10 开始用终端命令 torchrun 来代替 torch.distributed.launch

  • torchrun 实现了 launch 的一个超集

不同:

  • 完全使用环境变量配置各类参数,如 RANK,LOCAL_RANK, WORLD_SIZE 等,尤其是 local_rank 不再支持用命令行隐式传递的方式
  • 更加优雅处理某个worker失败情况,重启worker。
    • 需要代码中有 load_checkpoint(path) 和 save_checkpoint(path) 这样有worker失败的话,可以通过load最新的模型,重启所有的worker接着训练。
  • 训练节点数目可以弹性变化。

上面代码直接使用运行即可,不用写那么长长的命令了。

torchrun --nproc_per_node=2 test_gpu.py

注意

  • torchrun 或者 launch 对上面ENV初始化方法支持最完善, TCP初始化方法的可能会出现问题,尽量使用env来初始化dist。

torch.distributed 使用

使用方式(单机多卡环境)

# 启动方式,shell中运行:
python -m torch.distributed.launch --nnodes 1 --nproc_per_node=4  YourScript.py
# nnodes: 表示有多少个节点,可以通俗的理解为有多少台机器
# nproc_per_node 表示每个节点上有多少个进程,每个进程一般独占一块GPU
########################## 	第1步	 ##########################
#初始化
'''
在启动器为我们启动python脚本后,在执行过程中,启动器会将当前进程的(其实就是 GPU的)index 通过参数传递给 python,
我们可以这样获得当前进程的 index:即通过参数 local_rank 来告诉我们当前进程使用的是哪个GPU,
用于我们在每个进程中指定不同的device
'''
parse.add_argument('--local_rank',type=int)
args=parser.parse_args()
local_rank=args.local_rank
torch.cuda.set_device(local_rank)
'''
init_process_group用于初始化GPU通信方式(NCCL)和参数的获取方式(env代表通过环境变量)
gpu使用nccl最快,gloo为cpu分布式训练,mpu则需要重新编码
init_method 指定如何初始化进程组的 URL。 
默认及推荐为'env://' 其他初始化方式与多机多卡有关(not sure,挖个坑)
'''
torch.distributed.init_process_group('nccl'init_method='env://')
device = torch.device(f'cuda:{args.local_rank}')
##########################	第2步  ##########################
#处理Dataloader
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,shuffle=True)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
#torch.utils.data.DataLoader中的shuffle应该设置为False(默认),因为打乱的任务交给了sampler
##########################	第3步  ##########################
#模型的初始化
model=torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank])
'''
使用 DistributedDataParallel 包装模型,
它能帮助我们为不同 GPU 上求得的梯度进行allreduce(即汇总不同 GPU 计算所得的梯度,并同步计算结果)。
allreduce 后不同 GPU 中模型的梯度均为 allreduce 之前各 GPU 梯度的均值。
''''
##########################	第4步  ##########################
#同DP,进行inputs、labels的设备转移
sampler = DistributedSampler(dataset) # 这个sampler会自动分配数据到各个gpu上
DataLoader(dataset, batch_size=batch_size, sampler=sampler)

完整代码如下:

import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os
from torch.utils.data.distributed import DistributedSampler
# 1) 初始化
torch.distributed.init_process_group(backend="nccl")

input_size = 5
output_size = 2
batch_size = 30
data_size = 90

# 2) 配置每个进程的gpu
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)

class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size).to('cuda')

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return self.len

dataset = RandomDataset(input_size, data_size)
# 3)使用DistributedSampler
rand_loader = DataLoader(dataset=dataset,
                         batch_size=batch_size,
                         sampler=DistributedSampler(dataset))

class Model(nn.Module):
    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        print("  In Model: input size", input.size(),
              "output size", output.size())
        return output

model = Model(input_size, output_size)

# 4) 封装之前要把模型移到对应的gpu
model.to(device)

if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    # 5) 封装:
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)

for data in rand_loader:
    if torch.cuda.is_available():
        input_var = data
    else:
        input_var = data
    output = model(input_var)
    print("Outside: input size", input_var.size(), "output_size", output.size())

执行脚本:

# 启用 DDP 模式
CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 torch_ddp.py

apex加速(混合精度训练、并行训练、同步BN)可参考

代码分布式改造

如何将单机训练代码改成分布式运行?

基本流程:

  • 分布式训练数据加载
  • 分布式训练
  • 分布式评估

分布式数据集

Dataloader 要把所有数据分成N份(N为worldsize), 并能正确分发到不同进程中,每个进程可以拿到一个数据的子集,不重叠,不交叉。

这部分工作靠 DistributedSampler 完成,函数签名如下:

torch.utils.data.distributed.DistributedSampler(dataset,
				num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False)

参数说明

  • dataset: 需要加载的完整数据集
  • num_replicas: 把数据集分成多少份,默认是当前dist的world_size
  • rank: 当前进程的id,默认从dist的rank
  • shuffle:是否打乱
  • drop_last: 如果数据长度不能被world_size整除,可以考虑是否将剩下的扔掉
  • seed:随机数种子。
    • 注意: 从源码中可以看出,真正的种子其实是 self.seed + self.epoch, 好处是,不同epoch每个进程拿到的数据是不一样,因此要在每个epoch开始前设置下:sampler.set_epoch(epoch)

Sampler 实现核心代码:

indices[self.rank: self.total_size: self.num_replicas]

假设4卡12条数据,rank=0,1,2,3, num_replicas=4, 那么每个卡取的数据索引就是:

rank0: [0 4 8]; rank1: [1 5 9]; rank2: [2 6 10]; rank3: [3 7 11]

保证不重复不交叉。这样在分布式训练的时候,只需要给 Dataloader 指定 DistributedSampler 即可,简单示例如下:

sampler = DistributedSampler(dataset)
loader = DataLoader(dataset, sampler=sampler)
for epoch in range(start_epoch, n_epochs):
  sampler.set_epoch(epoch) # 设置epoch 更新种子
  train(loader)

模型的分布式训练封装。将单机模型使用 torch.nn.parallel.DistributedDataParallel 进行封装,如下:

torch.cuda.set_device(local_rank)
model = Model().cuda()
model = DistributedDataParallel(model, device_ids=[local_rank])
# 要调用model内的函数或者属性. model.module.xxxx

多卡训练时,每个进程有一个model副本和optimizer,使用自己的数据进行训练,之后反向传播计算完梯度的时候,所有进程的梯度会进行 all-reduce 操作进行同步,进而保证每个卡上的模型更新梯度是一样的,模型参数也是一致的。

注意

  • 在save和load模型时候,为了减小所有进程同时读写磁盘,以主进程为主,rank0 先save模型,在map到其他进程。
  • 另外一个好处: 最开始训练时,模型随机初始化之后,保证了所有进程的模型参数保持一致。

torch的DDP封装时,已经做到了这一点,即使开始随机初始化不同,经过DDP封装,所有进程都一样的参数

简洁代码如下:

model = DistributedDataParallel(model, device_ids=[local_rank])
CHECKPOINT_PATH ="./model.checkpoint"
if rank == 0:
  torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# barrier()其他保证rank 0保存完成
dist.barrier()
map_location = {"cuda:0": f"cuda:{local_rank}"}
model.load_state_dict(torch.load(CHECKPOINT_PATH, map_location=map_location))
# 后面正常训练代码
optimizer = xxx
for epoch:
  for data in Dataloader:
      model(data)
      xxx
    # 训练完成 只需要保存rank 0上的即可
    # 不需要dist.barrior(), all_reduce 操作保证了同步性
  if rank == 0:
     torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

分布式训练

DDP分布式训练步骤:

  • 初始化进程组 dist.init_process_group
  • 设置分布式采样器 DistributedSampler
  • 使用DistributedDataParallel封装模型
  • 使用torchrun 或者 mp.spawn 启动分布式训练

使用分布式做 evaluation 时,要先把所有进程的输出结果进行 gather,再进行指标计算,两个常用函数:

dist.all_gather(tensor_list, tensor) # 将所有进程的tensor进行收集并拼接成新的tensorlist返回,比如:
dist.all_reduce(tensor, op) # 对tensor 的 in-place 操作, 对所有进程的某个tensor进行合并操作,op可以是求和等

代码

import torch
import torch.distributed as dist

dist.init_process_group('nccl', init_method='env://')
rank = dist.get_rank()
torch.cuda.set_device(rank)

tensor = torch.arange(2) + 1 + 2 * rank
tensor = tensor.cuda()
print(f"rank {rank}: {tensor}")

tensor_list = [torch.zeros_like(tensor).cuda() for _ in range(2)]
dist.all_gather(tensor_list, tensor)
print(f"after gather, rank {rank}: tensor_list: {tensor_list}")

dist.barrier()
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
print(f"after reduce, rank {rank}: tensor: {tensor}")

命令

torchrun --nproc_per_node=2 test_ddp.py

输出结果如下:

rank 1: tensor([3, 4], device='cuda:1')
rank 0: tensor([1, 2], device='cuda:0')
after gather, rank 1: tensor_list: [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')]
after gather, rank 0: tensor_list: [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')]
after reduce, rank 0: tensor: tensor([4, 6], device='cuda:0')
after reduce, rank 1: tensor: tensor([4, 6], device='cuda:1')

分布式评估

evaluation 时,可以拿到所有进程中模型输出,最后统一计算指标,基本流程如下:

pred_list = []
for data in Dataloader:
    pred = model(data)
    batch_pred = [torch.zeros_like(label) for _ in range(world_size)]
    dist.all_gather(batch_pred, pred)
    pred_list.extend(batch_pred)
pred_list = torch.cat(pred_list, 1)
# 所有进程pred_list是一致的,保存所有数据模型预测的值

pytorch 分布式操作

【2024-8-4】彻底搞清楚torch. distributed分布式数据通信all_gather、all_reduce

all_gather和all_reduce;gather、reduce、scatter方法对比

all_gather

分布式操作

  • gather 操作用于在不同节点间收集信息
  • 首先初始化一个空 Tensor 列表 tensor_list, 用于接收所有节点的信息
  • 然后调用 all_gather 在所有节点中得到包含每个节点本地张量的列表
  • 列表中有 world_size 个元素,每个元素都是bs大小,后续通过cat操作即可得到大小为 bs * world_size 表示

Pytorch DDP 分布式数据合并通信 torch.distributed.all_gather()

torch.distributed.all_gather()

函数定义

  • tensor_list 是list,大小是 word_size,每个元素为了是gather后,保存每个rank的数据,所以初始化一般使用torch.empty;
  • tensor 代表各rank中的tensor数据,其中tensor_list每个分量的维度要与对应的tensor参数中每个rank的维度相同。
all_gather(tensor_list, tensor, group=None, async_op=False)
  • tensor_list 每个元素代表每个rank的数据
  • tensor 代表每个进程中的tensor数据
  • 其中tensor_list每个分量的维度要与对应的tensor参数中每个rank的维度相同。
# 两个机器,每个4张卡,批大小为bs
tensor = torch.arange(bs, dtype=torch.int64) + 1 + 2 * rank
tensor_list = [torch.zeros(bs, dtype=torch.int64) for _ in range(torch.distributed.get_world_size())]
dist.all_gather(tensor_list, tensor)
tensor_list

all_reduce

all_reduce 操作用于在不同节点中同步信息

  • 调用该方法, 在所有节点中求和/平均,使用前后大小均为bs
tensor = torch.arange(bs, dtype=torch.int64) + 1 + 2 * rank
dist.all_reduce(tensor, op=ReduceOp.SUM)
tensor

all_reduce 函数定义

  • tensor 代表各rank中的tensor数据,op代表可以选择的操作,主要有: SUM、PRODUCT、MIN,MAX、BAND、BOR、BXOR、PREMUL_SUM

Torchrun (更新)

PyTorch 官网介绍

  • This module(torch.distributed.launch) is going to be deprecated in favor of torchrun.

Pytorch 1.9.0 引入了 torchrun,替代以前的 torch.distributed.launch

  • torchruntorch.distributed.launch 的超集, elastic launch, 等效于 python -m torch.distributed.run
  • torchrun 包含 torch.distributed.launch 几乎所有功能(除了废弃的--use-env)

torchrun 包含了 torch.distributed.launch 所有功能,还有三点额外功能:

  • 1、worker_rankworld_size 将被自动分配
  • 2、Failover: worker失败时, 重新启动所有workers来处理 workers 故障
  • 3、Elastic: 动态增减节点, 允许节点数目在最大/最小值之间改变, 即具备弹性

用法

几种模式

  • 单机多卡 torchrun –standalone –nnodes=1 –nproc_per_node=N inference.py –args
  • 多机多卡 torchrun –nnodes=M –nproc_per_node=N inference.py –args

–nnodes:计算节点(也就是机器)的数量,单机的话就是1,M机的话就是M –nproc_per_node:每个节点(每台机器)上进程的数量。因为一个进程需要放在一张显卡上跑,因此进程的数量也就是显卡的数量,比如单机八卡,就要将该参数设置为8 –args:运行inference.py脚本所需的参数

torchrun 示例

2机8卡 分布式训练示例

  • gpu 编号: 0~3
  • local rank: gpu 本地编号, 0~3
  • global rank: gpu 全局编号, 0~7

环境

train_elastic.py

def run():
    env_dict = {
        key: os.environ[key]
        for key in ("MASTER_ADDR", "MASTER_PORT", "WORLD_SIZE", "LOCAL_WORLD_SIZE")
    }
    print(f"[{os.getpid()}] Initializing process group with: {env_dict}")
    dist.init_process_group(backend="nccl")
    train()
    dist.destroy_process_group()

if __name__ == "__main__":
    run()

启动脚本 run_elastic.sh

  • node0 和 node1 上分别执行脚本
torchrun \
    --nnodes=1:3\
    --nproc_per_node=4\
    --max_restarts=3\
    --rdzv_id=1\
    --rdzv_backend=c10d\
    --rdzv_endpoint="192.0.0.1:1234"\
    train_elastic.py

描述如下(注:node0和node1均通过该脚本进行启动)

  • nnodes=1:3: 当前训练任务接受最少1个node,最多3个node, 参与分布式训练;
  • nproc_per_node=4: 每个node上节点有4个process
  • max_restarts=3: worker group最大的重启次数;
    • 注意: node fail、node scale down和node scale up都会导致restart;
  • rdzv_id=1:一个unique的job id,所有node均使用同一个job id;
  • rdzv_backend: rendezvous backend实现,默认支持c10d和etcd两种;rendezvous用于多个node之间的通信和协调;
  • rdzv_endpoint: rendezvous 地址,应该为一个node的host ip和port;

迁移 launch -> torchrun

torch.distributed.launch -> torchrun

迁移方法

python -m torch.distributed.launch -> torchrun
# (1) 如果 从环境变量(LOCAL_RANK)中读取 local_rank 参数, 直接忽略
# 更改前
python -m torch.distributed.launch --use-env train_script.py
# 更改后
torchrun train_script.py
# (2) 如果 从命令行(--local-rank)读取 local_rank 参数
# 更改前
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--local-rank", type=int)
args = parser.parse_args()
local_rank = args.local_rank
# 更改后
import os
local_rank = int(os.environ["LOCAL_RANK"])
# local_rank参数应当从环境变量中读取,而不是通过参数传递。
### ------ BEFORE ------
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=int)
args = parser.parse_args()

local_rank = args.local_rank
### ------ NOW -------
import os
local_rank = int(os.environ["LOCAL_RANK"])

#运行脚本
torchrun train_script.py #除了--use_env参数,其他torch.distributed.launch所使用的参数均可使用,
			 #如nnodes、nproc_per_node

初始化 init_process_group

dist.init_process_group() 是PyTorch中用于初始化分布式训练的函数之一。

  • 作用: 设置并行训练环境,连接多个进程以进行数据和模型的分布式处理。

通过init_process_group()函数这个方法来进行初始化

其参数包括以下内容

  • backend(必需参数):指定分布式后端的类型,选项之一:
    • ‘tcp’:使用TCP协议进行通信。
    • ‘gloo’:使用Gloo库进行通信。
    • ‘mpi’:使用MPI(Message Passing Interface)进行通信。
    • ‘nccl’:使用NCCL库进行通信(适用于多GPU的分布式训练)。
    • ‘hccl’:使用HCCL库进行通信(适用于华为昇腾AI处理器的分布式训练)。
  • init_method(可选参数):指定用于初始化分布式环境的方法。它可以是以下选项之一:
    • ‘env://’:使用环境变量中指定的方法进行初始化。
    • ‘file:// ’:使用本地文件进行初始化。
    • ‘tcp://:’:使用TCP地址和端口进行初始化。
    • ‘gloo://:’:使用Gloo地址和端口进行初始化。
    • ‘mpi://:’:使用MPI地址和端口进行初始化。
  • rank(可选参数):指定当前进程的排名(从0开始)。
  • world_size(可选参数):指定总共使用的进程数。
  • timeout(可选参数):指定初始化的超时时间。
  • group_name(可选参数):指定用于连接的进程组名称。

多机多卡 DDP

概念理解

  • group: 进程组,通常DDP的各个进程都是在同一个进程组下
  • world_size: 总的进程数量(原则上,一个进程占用一个GPU)
  • rank:当前进程的序号,用于进程间通信,rank=0表示主机为master节点
  • local_rank:当前进程对应的GPU号

举个栗子 :

  • 4台机器 (每台机器8张卡) 进行分布式训练。
  • 通过 init_process_group() 对进程组进行初始化。
  • 初始化后 可以通过 get_world_size() 获取到 world size = 32。
  • 在该例中为32, 即有32个进程,其编号为0-31 通过 get_rank() 函数可以进行获取 在每台机器上,local rank均为0-8, 这是 local rank 与 rank 的区别, local rank 会对应到实际的 GPU ID 上。
########################## 	第1步	 ##########################
#初始化
rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(rank % torch.cuda.device_count())
dist.init_process_group(backend="nccl")
device = torch.device("cuda", local_rank)
########################## 	第2步	 ##########################
#模型定义
model = model.to(device)
model = DDP(model, device_ids=[local_rank], output_device=local_rank)

#数据集操作与DDP一致

#####运行
'''
exmaple: 2 node, 8 GPUs per node (16GPUs)
需要在两台机器上分别运行脚本
注意细节:node_rank master 为 0 
机器1
>>> python -m torch.distributed.launch \
    --nproc_per_node=8 \
    --nnodes=2 \
    --node_rank=0 \
    --master_addr="master的ip" \
    --master_port=xxxxx \
    YourScript.py
机器2
>>> python -m torch.distributed.launch \
    --nproc_per_node=8 \
    --nnodes=2 \
    --node_rank=1 \
    --master_addr="master的ip" \
    --master_port=xxxxx \
    YourScript.py

'''

FSDP (DDP改进)

DistributedDataParallel (DDP) 训练

多机多卡方式中

  • 每个 process/worker 都有模型的一个副本(Replica)
  • 每个 process/worker 处理一个 batch 数据, 并行处理
  • 最后用 all-reduce 操作对多个不同 process/worker 计算得到的梯度进行累加求和;
  • 接着,再将优化器状态梯度通过跨多个 process/worker 进行复制,使得每个 process/worker 上的模型参数都得到同步更新。

DDP 中,模型权重和优化器状态在所有工作线程中复制。

  • 核心能力还是训练数据并行(Data Parallel)
  • DDP 没有实现对模型参数分片管理,即模型并行(Model Parallel)

PyTorch 1.11 中发布 FSDP

FSDP 实现了模型的分片管理能力,真正实现了模型并行

  • 将模型分片后,使用 FSDP 训练模型,每个 GPU 只保存模型的一个分片,这样能够使 GPU 的内存占用比 DDP 方式小得多,从而使分片的大模型和数据能够适配 GPU 容量,更有希望实现超大模型的分布式训练。
  • 问题: process/worker 节点之间的通信开销一定程度增加,但是可在 PyTorch 内部有针对性地进行优化来降低通信代价,比如对通信、计算进行 overlapping 能够很好地降低由此带来的网络开销。

使用 FSDP 训练时,GPU 内存占用量比在所有工作线程上使用 DDP 进行训练时要小。

  • 允许更大模型或批量大小适合设备,使一些非常大的模型的训练变得可行。
  • 这是伴随着通信量增加的成本而来的。通过内部优化(例如重叠通信和计算)减少了通信开销。

图解

FSDP 原理

FSDP 在不同阶段的基本处理过程,如下所示:

  • 01 初始化阶段
    • 分片模型参数,每个 rank 只有自己的分片
  • 02 forward 阶段
    • 运行 all_gather,收集所有 rank 上的模型参数分片,生成恢复得到模型参数,以保证满足当前 FSDP Unit 的计算需要
    • 运行 forward 计算过程
    • 丢掉所有被收集过的其它 rank 上的模型参数分片
  • 03 backward 阶段
    • 运行 all_gather, 收集所有 rank 上的模型参数分片,恢复全部的模型参数,以保证满足当前 FSDP Unit 的计算需要
    • 运行 backward 计算过程
    • 运行 reduce_scatter, 在所有 rank 之间同步梯度
    • 丢掉所有从其它 rank 上收集过的模型参数分片

查看 FSDP’s 分片方法

  • 将 DDP 梯度全归约分解为归约分散全聚集
  • 向后传递过程中,FSDP 减少并分散梯度,确保每个等级都拥有梯度碎片。
  • 在优化器步骤中更新参数的相应分片。
  • 最后,后续前向传播中,执行全收集操作来收集并组合更新的参数分片

分片原理

FSDP 默认的分片策略(Sharding Strategy)是对模型参数梯度优化器状态都进行分片处理,即 Zero3 分片策略

  • 编程中可以使用 ShardingStrategy.FULL_SHARD 来指定。

对于 Zero2 分片策略,只对梯度优化器状态进行分片处理

  • 编程中可以使用 ShardingStrategy.SHARD_GRAD_OP 来指定。
  • 如果配置使用 Zero2 分片策略,那么所有模型参数都会全量加载到每个 rank 对应的 GPU 内,即每个 GPU 持有一个模型的副本。
  • forward 阶段和 backward 阶段模型参数都在 GPU 内而不会被 offload 到 CPU,这样就不需要频繁地在多个 GPU 之间传输模型参数分片信息,能够在一定程度上降低 FSDP 集群的通信开销。

FSDP 处理模型分片的总体流程

  • 论文《PyTorch FSDP: Experiences on Scaling Fully Sharded Data Parallel》

模型具有 6 个层,FSDP 将其分解为 3 个 FSDP Unit,分别为

  • Unit0 = [layer0, layer3]
  • Unit1 = [layer1, layer2]
  • Unit2 = [layer4, layer5]

进行 forward 和 backward 计算之前需要从其它 rank 上收集对应的参数分片,从而保证计算是正确的。

以 Unit1 为例, 说明如何进行分片处理,该 FSDP Unit 包含了 layer1 和 layer2 两层。

  • 进行 forward 计算之前,需要将这两层的参数对应于其它 rank 上的分片收集过来使 layer1 和 layer2 两层的参数是 Unsharded,即保证参数是完整的以便进行计算,然后在本地执行 forward 计算过程,完成 layer0 和 layer3 这两层的计算逻辑。当 forward 计算完成后,会释放掉刚刚从其它 rank 上收集到的参数分片,以降低内存空间的占用。每一轮 forward 计算,FSDP 一次只需要处理一个 Unit 的参数即可,而其它的 Unit 仍然保持其参数的分片状态。
  • 对于 backward 计算的过程也是类似的,它会先计算 layer2,再计算 layer1,在开始计算 layer2 层之前,FSDP 会从其它 rank 上收集 layer2、layer1 层的分片参数,恢复得到这两层完整的参数后,Autograd 引擎会继续完成 layer2、layer1 这两层的计算,随后释放掉从其它 rank 上收集过来的参数分片。接着,FSDP 会进行 reduce-scatter 操作对梯度进行累加并分片。当 backward 计算结束后,每个 rank 都只保存了模型参数和梯度的分片部分。

FSDP 模型初始化

FSDP 模型初始化时,通过指定一个 device_id 参数来绑定到指定的 GPU 上

  • 首先模型的 Module 会在 CPU 中初始化
  • 然后加载到 GPU 内。

通过指定 device_id 能够保证当 GPU 无法容纳大的模型时,它能够 offload 到 CPU 中,而不至于出现 OOM 的问题。

创建 FSDP 模型

  • 只要将模型(继承自 nn.Module) model,通过 FSDP 进行 wrap 即可
  • 其中指定一些满足需要的配置选项

参数

  • auto_wrap_policy: 自动将模型分片处理,包括对模型参数优化器状态梯度进行分片,每个分片都放到一个不同的 FSDP Unit 中
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP

model = DDP(model)

torch.cuda.set_device(local_rank)
model = FSDP(model,
        auto_wrap_policy=t5_auto_wrap_policy,
        mixed_precision=bfSixteen,
        device_id=torch.cuda.current_device())

model = FSDP(model,
    auto_wrap_policy=my_auto_wrap_policy,
    cpu_offload=CPUOffload(offload_params=True))

注意

  • Transformer Encoder-Decoder 架构模型包含一些被 Encoder 和 Decoder 共享部分,比如 embedding 表,如果直接使用上面的 auto_wrap_policy 参数指定 Wrap Policy, 会使神经网络模型中这些共享的部分无法被共享
  • 所以只能把共享部分移动到 FSDP Unit 外部去,以便 Encoder 和 Decoder 都能访问这部分。
  • PyTorch 1.12 引入处理这种情况的特性,为 Transformer 注册一个 共享 Layer 实现类,使 FSDP 的分片计划(Sharding Plan)实现高效的通信处理。
t5_auto_wrap_policy = functools.partial(
        transformer_auto_wrap_policy,
        transformer_layer_cls={
            T5Block, # T5 Transformer 层的实现类,封装了 MHSA 和 FFN 两层
        },
    )
torch.cuda.set_device(local_rank)
model = FSDP(model, fsdp_auto_wrap_policy=t5_auto_wrap_policy)

分布式训练高层封装

对 torch 几个流程进行一层封装【初始化、包装模型、优化器、数据加载】。

考虑几个因素

  • 支持分布式训练模式丰富,如 CPU,单机单卡,单机多卡,多机多卡,FP16等
  • 代码简单,不需要改动大量代码, 即可进行分布式训练
  • 接口丰富,方便自定义。比如 能调用和访问底层分布式的一些变量如rank,worldsize,或实现或封装一些分布式函数,比如dist.gather/reduce等。

得到更加易用的框架:

  • Accelerator
  • Horovod

这两个都是非常易用的分布式框架。 还有一些其他的,比如 pytorch-lightningdeepspeed

以bert情感分类为例子,介绍了如何使用原生DDP和上面2个框架来进行分布式训练

Accelerator

由大名鼎鼎的 huggingface 发布的 Accelerator,专门适用于Pytorch 分布式训练框架:

将单进程代码改为多进程分布式:

import accelerate
accelerator = accelerate.Accelerator()
device = accelerator.device #获取当前进程的设备
...
# 进行封装
model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader)

#训练时 loss.backward() 换为:
accelerator.backward(loss)

使用CLI命令行方式运行,先使用 accelerator config 配置一次分布式训练参数,之后就使用 acceleratoe launch 运行。

除此之外,accelerator 还提供了一些很便利的接口,基本覆盖了分布式训练中需要用到的方法,比如:

  • accelerator.print: 仅仅在主进程输出
  • accelerator.process_index: 当前进程ID,没有使用rank命名,而是用的process_index来表示
  • accelerator.is_local_main_process/is_main_processs: 是否local_rank 或则rank为0, 主进程
  • accelerator.wait_for_everyone(): 类似 dist.barrier() , 等所有进程到达这一步。
  • accelerator.save: 保存模型
  • kwargs_handlers: 可以定义DDP初始化的一些参数,比如最常用的就是 find_unused_parameters,比如:
import accelerate
from accelerate import DistributedDataParallelKwargs as DDPK
kwargs = DDPK(find_unused_parameters=True)
accelerator = accelerate.Accelerator(kwargs_handlers=[kwargs])

accelerator 基本已经满足使用 Pytorch 进行分布训练的需求,而且十分符合 huggingface 风格,把某个小项目做到最好用,类似的还有 transformers, tokenizers, datasets 等等。

不足

  • accelerate 支持的 collective function 比较少,目前只有 all_gather。

Horovod 第二个常用的分布式库Horovod是一个通用的深度学习分布式训练框架,支持Tensorflow,Pytorch,MXNet,Keras等等,因此比Accelerator要更加重些,但是功能也会更加丰富,这里以Pytorch为例来简单介绍。多说一下,Horovod的安装相对复杂一些,需要针对具体的环境参考readme进行安装。

GitHub:https://github.com/horovod/horovod 官网:https://horovod.ai/ Horovod的使用也很简单,基本也是那几个流程:

import horovod.torch as hvd
# 初始化
hvd.init()
# Samapler
# *此处num_replicas=hvd.size(), rank=hvd.rank()必须*
train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset, num_replicas=hvd.size(), rank=hvd.rank())

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
# 优化器包装
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
# 模型分发广播
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
# 模型训练不需要修改

horovod 支持的运行方式非常多,最常用的就是 horovodrun ,比如单机四卡运行:

horovodrun -np 4 -H localhost:4 python3 train.py

horovod 相比 accelerate 功能更加丰富,支持的接口,函数,框架都要多, 比如: hvd.all_reduce, hvd.all_gather等等。

Horovod

Horovod 是 Uber开源的跨平台的分布式训练工具,名字来自于俄国传统民间舞蹈,舞者手牵手围成一个圈跳舞,与Horovod设备之间的通信模式很像,有以下几个特点:

  • 兼容TensorFlow、Keras和PyTorch机器学习框架。
  • 使用Ring-AllReduce算法,对比Parameter Server算法,有着无需等待,负载均衡的优点。
  • 实现简单,五分钟包教包会。

Horovod环境准备以及示例代码,可参考上一篇

Pytorch 1.x 多机多卡计算模型没有采用主流的 Parameter Server 结构,而是直接用了Uber Horovod 的形式,即百度开源的 RingAllReduce 算法

Uber 的 Horovod 采用 RingAllReduce 计算方案,特点:网络单次通信量不随着 worker(GPU) 的增加而增加,是一个恒定值。

与 TreeAllReduce 不同,RingAllreduce 算法的每次通信成本是恒定的,与系统中 gpu 的数量无关,完全由系统中 gpu 之间最慢的连接决定。

新技术

DisTrO

【2024-8-29】DisTrO 让你家里的电脑也能训练超级大模型

Nous Research 最近放出了一份重磅报告,介绍最新研究成果——DisTrO(Distributed Training Over-the-Internet)。

有望让告别”只有大公司才能训练大模型”的时代,开启全民AI狂欢!

DisTrO 是一个分布式优化器家族,两个超级牛X的特点:

  • 与架构无关:不管你用啥架构,它都能用。
  • 与网络无关:网速慢?没关系,它照样能跑!

最厉害的是,DisTrO把GPU之间的通信需求减少了1000到10000倍!

在龟速网络上,用各种杂牌子的网络硬件,也能训练大型神经网络,而且收敛速度跟AdamW+All-Reduce一样快!

DisTrO 究竟有什么用呢?

  • 提高LLM训练的抗风险能力:不再依赖单一实体的计算能力,训练过程更安全、更公平。
  • 促进研究合作与创新:研究人员和机构可以更自由地合作,尝试新技术、新算法、新模型。
  • 推动AI民主化:降低了训练大模型的门槛,让更多人有机会参与其中。

分布式训练库

详见站内专题: 分布式训练库

结束


支付宝打赏 微信打赏

~ 海内存知已,天涯若比邻 ~

Share

Related Posts

下一篇 分布式训练

标题:DeepSpeed 学习笔记

摘要:DeepSpeed 知识点、训练技巧总结

标题:分布式训练

摘要:分布式训练知识点

站内可视化导航

文章可视化导读:鼠标划过图形块时,如果出现蓝色光环, 点击即可跳转到对应主题

Comments

--disqus--

    Content
    My Moment ( 微信公众号 )
    欢迎关注鹤啸九天