[源码解析] PyTorch 分布式(18) --- 使用 RPC 的分布式管道并行

时间:2023-03-08 22:57:46
[源码解析] PyTorch 分布式(18) --- 使用 RPC 的分布式管道并行

[源码解析] PyTorch 分布式(18) --- 使用 RPC 的分布式管道并行

0x00 摘要

在前面的文章之中,我们已经学习了PyTorch 分布式的基本模块,接下来我们通过几篇文章来看看如何把这些模块应用到实践之中,顺便把PyTorch分布式逻辑整体梳理一下。本文介绍如何使用 RPC 来完成分布式管道并行。

本文以DISTRIBUTED PIPELINE PARALLELISM USING RPC 的翻译为基础,加入了自己的理解。

PyTorch分布式其他文章如下:

深度学习利器之自动微分(1)

深度学习利器之自动微分(2)

[源码解析]深度学习利器之自动微分(3) --- 示例解读

[源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)

[源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)

[源码解析] PyTorch如何实现前向传播(3) --- 具体实现

[源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎

[源码解析] Pytorch 如何实现后向传播 (2)---- 引擎静态结构

[源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑

[源码解析] PyTorch 如何实现后向传播 (4)---- 具体算法

[源码解析] PyTorch 分布式(1)------历史和概述

[源码解析] PyTorch 分布式(2) ----- DataParallel(上)

[源码解析] PyTorch 分布式(3) ----- DataParallel(下)

[源码解析] PyTorch 分布式(4)------分布式应用基础概念

[源码解析] PyTorch分布式(5) ------ DistributedDataParallel 总述&如何使用

[源码解析] PyTorch分布式(6) ---DistributedDataParallel -- 初始化&store

[源码解析] PyTorch 分布式(7) ----- DistributedDataParallel 之进程组

[源码解析] PyTorch 分布式(8) -------- DistributedDataParallel之论文篇

[源码解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化

[源码解析] PyTorch 分布式(10)------DistributedDataParallel 之 Reducer静态架构

[源码解析] PyTorch 分布式(11) ----- DistributedDataParallel 之 构建Reducer和Join操作

[源码解析] PyTorch 分布式(12) ----- DistributedDataParallel 之 前向传播

[源码解析] PyTorch 分布式(13) ----- DistributedDataParallel 之 反向传播

[源码解析] PyTorch 分布式 Autograd (1) ---- 设计

[源码解析] PyTorch 分布式 Autograd (2) ---- RPC基础

[源码解析] PyTorch 分布式 Autograd (3) ---- 上下文相关

[源码解析] PyTorch 分布式 Autograd (4) ---- 如何切入引擎

[源码解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)

[源码解析] PyTorch 分布式 Autograd (6) ---- 引擎(下)

[源码解析] PyTorch分布式优化器(1)----基石篇

[源码解析] PyTorch分布式优化器(2)----数据并行优化器

[源码解析] PyTorch分布式优化器(3)---- 模型并行

[源码解析] PyTorch 分布式(14) --使用 Distributed Autograd 和 Distributed Optimizer

[源码解析] PyTorch 分布式(15) --- 使用分布式 RPC 框架实现参数服务器

[源码解析] PyTorch 分布式(16) --- 使用异步执行实现批处理 RPC

[源码解析] PyTorch 分布式(17) --- 结合DDP和分布式 RPC 框架

注:本文没有完全按照原文顺序进行翻译,而是按照自己理解的思路重新组织了文章。原文是从下至上,从细节到整体的顺序分析,但是我在理解时候总觉得别扭,缺乏一个总体的感知,所以我们还是以从上到下的逻辑,配合图例进行分析。

0x01 综述

1.1 先决条件

本教程使用 Resnet50 模型来演示使用torch.distributed.rpc API实现分布式管道并行。这可以看作是单机模型并行最佳实践中讨论的多 GPU 流水线并行的分布式对应版本。

本文的先决条件如下:

注意

  • 本教程需要 PyTorch v1.6.0 或更高版本。

  • 本教程的完整源代码可以在pytorch/examples找到 。

1.2 基础知识

之前的教程分布式 RPC 框架入门 展示了如何使用torch.distributed.rpc 为 RNN 模型实现分布式模型并行。该教程使用一个 GPU 来托管EmbeddingTable,并且提供的代码运行良好。但是,如果模型存在于多个 GPU 上,则需要一些额外的步骤来提高所有 GPU 的摊销利用率。管道并行就是一种在这种情况下可以提供帮助的范式。

在本教程中,我们使用ResNet50作为示例模型,单机模型并行最佳实践 教程也使用该模型。类似地,ResNet50模型被分成两个分片,输入批次被分成多个分片,并以流水线方式输入到两个模型分片中。不同之处在于,本教程不是使用 CUDA 流并行执行,而是调用异步 RPC。因此,本教程中提供的解决方案也适用于跨机器边界。本教程的其余部分将分四个步骤介绍实现。

0x02 启动

下面的代码显示了所有进程的目标函数,在所有节点上都会运行 run_worker,但是其执行代码不同。

  • 主要逻辑定义在run_master之中,这是本系统的大脑和实际执行者。
  • worker 被动地等待来自 master 的命令,因此只运行init_rpcand shutdown
    • init_rpc只是建立分布式环境。
    • shutdown默认情况下将阻塞,直到所有 RPC 参与者结束工作。
    • 具体业务工作都是master通过RPC直接调度到worker节点上来运行。
def run_worker(rank, world_size, num_split):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500' # Higher timeout is added to accommodate for kernel compilation time in case of ROCm.
options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=256, rpc_timeout=300) if rank == 0:
rpc.init_rpc(
"master",
rank=rank,
world_size=world_size,
rpc_backend_options=options
)
run_master(num_split)
else:
rpc.init_rpc(
f"worker{rank}",
rank=rank,
world_size=world_size,
rpc_backend_options=options
)
pass # block until all rpcs finish
rpc.shutdown() if __name__=="__main__":
world_size = 3
for num_split in [1, 2, 4, 8]:
tik = time.time()
mp.spawn(run_worker, args=(world_size, num_split), nprocs=world_size, join=True)
tok = time.time()
print(f"number of splits = {num_split}, execution time = {tok - tik}")

逻辑如下:

           torch.multiprocessing.spawn
+
|
|
+------------+----------------------------+
| |
| |
v v
+--------+---------------------------+ +------+----------+
| "ps" | | f"worker{rank}" |
| | | |
| rank = 0 | | rank = 1,2 |
| | | |
| run_worker +----> run_master | | run_worker |
| | | |
+------------------------------------+ +-----------------+

0x03 定义训练循环

现在我们看看训练循环(training loop)。我们使用专门的 "master " worker 来准备随机输入和标签,并控制分布式反向传播和分布式优化器step。

  • 它首先创建DistResNet50模块的一个实例,指定了每个批次的微批次数量,还提供了两个 RPC 工作线程的名称(即“worker1”和“worker2”)。

  • 然后定义了损失函数并使用parameter_rrefs()拿到了一个参数列表RRefs,以此创建了DistributedOptimizer

  • 最后,主训练循环与常规本地训练非常相似,不同之处在于它用于dist_autograd启动后向传播,并为后向传播和优化器 step()提供了 context_id

#########################################################
# Run RPC Processes #
######################################################### num_batches = 3
batch_size = 120
image_w = 128
image_h = 128 def run_master(split_size): # put the two model parts on worker1 and worker2 respectively
model = DistResNet50(split_size, ["worker1", "worker2"])
loss_fn = nn.MSELoss()
opt = DistributedOptimizer( # 分布式优化器
optim.SGD,
model.parameter_rrefs(),
lr=0.05,
) one_hot_indices = torch.LongTensor(batch_size) \
.random_(0, num_classes) \
.view(batch_size, 1) for i in range(num_batches):
print(f"Processing batch {i}")
# generate random inputs and labels
inputs = torch.randn(batch_size, 3, image_w, image_h)
labels = torch.zeros(batch_size, num_classes) \
.scatter_(1, one_hot_indices, 1) # The distributed autograd context is the dedicated scope for the
# distributed backward pass to store gradients, which can later be
# retrieved using the context_id by the distributed optimizer.
with dist_autograd.context() as context_id:
outputs = model(inputs)
dist_autograd.backward(context_id, [loss_fn(outputs, labels)]) # 分布式梯度
opt.step(context_id)

我们先按照单机的思路来画图,下一节会再扩展。从单机角度看,好像没啥稀奇的地方。

           torch.multiprocessing.spawn
+
|
|
+------------+-------------------------------------------+
| |
| |
v v
+--------+----------------------------------------------+ +------+----------+
| "ps" | | f"worker{rank}" |
| | | |
| rank = 0 | | rank = 1,2 |
| | | |
| run_worker +----> run_master | | run_worker |
| + | | |
| | | | |
| | | +-----------------+
| v |
| +-------------------------+-------------------------+ |
| | | |
| | | |
| | model = DistResNet50(split_size, | |
| | ["worker1", "worker2"]) | |
| | loss_fn = nn.MSELoss() | |
| | opt = DistributedOptimizer( | |
| | optim.SGD, | |
| | model.parameter_rrefs(), | |
| | lr=0.05, | |
| | ) | |
| | for i in range(num_batches): | |
| | with dist_autograd.context() as context_id: | |
| | outputs = model(inputs) | |
| | dist_autograd.backward(context_id, | |
| | [loss_fn(outputs, labels)]) | |
| | opt.step(context_id) | |
| | | |
| | | |
| +---------------------------------------------------+ |
| |
+-------------------------------------------------------+

0x04 将 ResNet50 模型分片拼接成一个模块

我们这里先假定分片是个黑盒子。

  • 首先,我们创建一个DistResNet50模块来组装两个分片并实现流水线并行逻辑。在构造函数中,我们使用两次 rpc.remote调用将两个分片分别放在两个不同的 RPC 工作线程上,并保持RRef指向到两个模型部分,以便在前向传递中引用它们。

  • forward函数将输入批次拆分为多个微批次,并以流水线方式将这些微批次提供给两个模型部件。

    • 首先使用 rpc.remote调用将第一个分片应用于微批次,然后将中间输出RRef转发到第二个模型分片。
    • 之后收集所有微输出(micro-outputs)的Future ,并在循环后等待所有微输出。
    • 请注意,remote()rpc_async()都立即返回并异步运行。因此,整个循环是非阻塞的,并且会同时启动多个 RPC。
  • 一个 micro-batch 在两个模型部分上的执行顺序由一个中间输出y_rref变量来维护。微批次之间的执行顺序无关紧要。

  • 最后,前向函数将所有微批次的输出连接成一个单一的输出张量并返回。该parameter_rrefs函数可以让我们简化分布式优化器构建,后面会用到。parameter_rrefs 的作用是:从 worker 1,worker 2 取出每个分片需要优化的参数。最后这些参数会传递给DistributedOptimizer。

class DistResNet50(nn.Module):
"""
Assemble two parts as an nn.Module and define pipelining logic
"""
def __init__(self, split_size, workers, *args, **kwargs):
super(DistResNet50, self).__init__() self.split_size = split_size # Put the first part of the ResNet50 on workers[0]
self.p1_rref = rpc.remote(
workers[0], # 放到第一个worker之上
ResNetShard1,
args = ("cuda:0",) + args,
kwargs = kwargs
) # Put the second part of the ResNet50 on workers[1]
self.p2_rref = rpc.remote(
workers[1], # 放到第二个worker之上
ResNetShard2,
args = ("cuda:1",) + args,
kwargs = kwargs
) def forward(self, xs):
# Split the input batch xs into micro-batches, and collect async RPC
# futures into a list
out_futures = []
for x in iter(xs.split(self.split_size, dim=0)): # 将输入批次拆分为多个微批次
x_rref = RRef(x) # 封装成RRef
y_rref = self.p1_rref.remote().forward(x_rref) # 第一个worker处理微批次
z_fut = self.p2_rref.rpc_async().forward(y_rref) # 第二个worker继续处理
out_futures.append(z_fut) # collect and cat all output tensors into one tensor.
return torch.cat(torch.futures.wait_all(out_futures)) def parameter_rrefs(self):
remote_params = []
remote_params.extend(self.p1_rref.remote().parameter_rrefs().to_here())
remote_params.extend(self.p2_rref.remote().parameter_rrefs().to_here())
return remote_params

为了演示,我们这里只画出了一个worker 1 的内部细节,请大家记住,worker 1 和 worker 2是一样的。同时,对 run_master 也进行了简化。流水线是在master的forward方法之中完成,具体在图上的1,2两个数字代表的箭头上体现。

         torch.multiprocessing.spawn
+
|
|
+------------+---------------------------------------------------+
| |
| |
v v
+------+----------------------------------------------+ +-------+--------------+
| "ps" rank = 0 | |"worker 1" rank = 1 |
| | | |
| run_worker DistributedOptimizer(p1_rref,p2_rref) | | run_worker |
| + | | |
| | | | |
| | DistResNet50 | | +-------------+ |
| | | +--------> |ResNetShard1 | |
| v | | | | | |
| run_master p1_rref +------------------------------------------> | | |
| + | | | +-------+-----+ |
| | | | | | |
| | p2_rref +-------------------------------+ | +----------------------+
| | | | | |
| | | | | |
| v | | | |
| +----+--------------------------------------------+ | | | |
| | model = DistResNet50(split_size, | | | | |
| | ["worker1", "worker2"]) | | | |1 |2
| | loss_fn = nn.MSELoss() | | | | |
| | opt = DistributedOptimizer( | | | | |
| | optim.SGD, | | | | |
| | model.parameter_rrefs(), | | | | |
| | ) | | | | v
| | for i in range(num_batches): | | | | +--------------+--------+
| | with dist_autograd.context() as context_id: | | | | | "worker 2" rank = 2 |
| | outputs = model(inputs) +----------------------+ | |
| | dist_autograd.backward(context_id, | | | | +--------------+ |
| | [loss_fn(outputs, labels)]) | | +------------> |ResNetShard2 | |
| | opt.step(context_id) | | | | | |
| +-------------------------------------------------+ | | +--------------+ |
+-----------------------------------------------------+ +-----------------------+

0x05 对 ResNet50 模型进行分区

这是ResNet50在两个模型分片中实现的准备步骤。下面的代码是从torchvision 中ResNet 实现中借用的。该ResNetBase模块包含两个 ResNet 分片(shards)的通用构建块和属性。

现在,我们已准备好定义两个模型分片。在构造函数之中,我们简单地将所有 ResNet50 层分成两部分,并将每个部分移动到提供的设备中。两个分片的forward功能如下:

  • 获取一个输入数据的RRef,这样就可以在本地获取数据,然后将其移动到预期的设备之上。
  • 将所有层应用于输入后,它将输出移动到 CPU 并返回。这是因为 RPC API 需要张量驻留在 CPU 上,以避免在调用方和被调用方中的设备数量不匹配时出现无效设备错误。
import threading
import torch
import torch.nn as nn
from torchvision.models.resnet import Bottleneck num_classes = 1000 def conv1x1(in_planes, out_planes, stride=1):
"""1x1 convolution"""
return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False) class ResNetBase(nn.Module):
def __init__(self, block, inplanes, num_classes=1000,
groups=1, width_per_group=64, norm_layer=None):
super(ResNetBase, self).__init__() self._lock = threading.Lock()
self._block = block
self._norm_layer = nn.BatchNorm2d
self.inplanes = inplanes
self.dilation = 1
self.groups = groups
self.base_width = width_per_group # 辅助函数,用来构建Sequential
def _make_layer(self, planes, blocks, stride=1):
norm_layer = self._norm_layer
downsample = None
previous_dilation = self.dilation
if stride != 1 or self.inplanes != planes * self._block.expansion:
downsample = nn.Sequential(
conv1x1(self.inplanes, planes * self._block.expansion, stride),
norm_layer(planes * self._block.expansion),
) layers = []
layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups,
self.base_width, previous_dilation, norm_layer))
self.inplanes = planes * self._block.expansion
for _ in range(1, blocks):
layers.append(self._block(self.inplanes, planes, groups=self.groups,
base_width=self.base_width, dilation=self.dilation,
norm_layer=norm_layer)) return nn.Sequential(*layers) def parameter_rrefs(self):
r"""
Create one RRef for each parameter in the given local module, and return a
list of RRefs.
"""
return [RRef(p) for p in self.parameters()] class ResNetShard1(ResNetBase):
"""
The first part of ResNet.
"""
def __init__(self, device, *args, **kwargs):
super(ResNetShard1, self).__init__(
Bottleneck, 64, num_classes=num_classes, *args, **kwargs) self.device = device # 配置设备
self.seq = nn.Sequential( # 构建Sequential模块
nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False),
self._norm_layer(self.inplanes),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
self._make_layer(64, 3),
self._make_layer(128, 4, stride=2)
).to(self.device) # 放到设备之上 for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif isinstance(m, nn.BatchNorm2d):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias) def forward(self, x_rref):
x = x_rref.to_here().to(self.device) # 把输入放到设备之上
with self._lock:
out = self.seq(x) # 把所有层都应用到输入之上
return out.cpu() # 输出需要移动到CPU之上 class ResNetShard2(ResNetBase):
"""
The second part of ResNet.
"""
def __init__(self, device, *args, **kwargs):
super(ResNetShard2, self).__init__(
Bottleneck, 512, num_classes=num_classes, *args, **kwargs) self.device = device # 配置设备
self.seq = nn.Sequential( # 构建Sequential模块
self._make_layer(256, 6, stride=2),
self._make_layer(512, 3, stride=2),
nn.AdaptiveAvgPool2d((1, 1)),
).to(self.device) # 放到设备上 self.fc = nn.Linear(512 * self._block.expansion, num_classes).to(self.device) def forward(self, x_rref):
x = x_rref.to_here().to(self.device) # 把输入放到设备之上
with self._lock:
out = self.seq(x) # 把所有层都应用到输入之上
return out.cpu() # 输出需要移动到CPU之上

我们把目前逻辑拓展如下,这里:

  • DistResNet50 被分成两个部分,分别在 worker 1,worker 2之上。
  • 两个部分的参数通过RRef保存在master之上。
  • ps就是master,它负责驱动全部业务。
  • 通过分布式优化器和分布式autograd完成后向传播。
  • 两个worker就是简单执行而已:
    • 负责搭建分布式环境和等待结束。
    • 具体工作是由master通过RPC直接放到worker之上运行。
  • 流水线则是在master的forward 之上显式配置,由一个中间输出来完成,具体在图上的1,2两个数字代表的箭头上体现。
      torch.multiprocessing.spawn
+
|
|
+------------+---------------------------------------------------+---------------------+
| | |
| | |
v v |
+---+----------------------------------------------+ +--------+------------------+ |
| "ps" rank = 0 | |"worker 1“ rank = 1 | |
| | | | |
| run_worker DistributedOptimizer(p1_rref,p2_rref) | | run_worker | |
| + | | +----------------------+ | |
| | | | | ResNetShard1 | | |
| | DistResNet50 +--------+-------------------> | | +----------------+ | | |
| | | | | | | ResNetBase | | | |
| v | | | | | | | | |
| run_master p1_rref +--------------------------------------------> parameters()| | | |
| + | | | | | | | | |
| | | | +-> | | | | | | |
| | p2_rref +---------------------------+ | | | | | | | |
| | | | | | | | +----------------+ | | |
| | | | | | | +----------------------+ | |
| | | | | | +---------------------------+ |
| | | | | | | |
| | +----------------------------+ | |
| | | | | | | |
| | | | | | | |
| v | | | | | |
| +---+------------------------------------------+ | | | 1 | |2 |
| | model = DistResNet50(split_size, | | | | | | |
| | ["worker1", "worker2"]) | | | | | | |
| | loss_fn = nn.MSELoss() | | | | V v |
| | opt = DistributedOptimizer( | | | | +------------+--------------+ |
| | optim.SGD, | | | | |"worker 2" rank = 2 | |
| | model.parameter_rrefs(), | | | | | | |
| | ) | | | | | +--------------------+ | |
| | for i in range(num_batches): | | | | | | ResNetShard2 | | |
| | with dist_autograd.context() as context_id:| | | | | | +----------------+ | | |
| | | | | | | | | ResNetBase | | +<-+
| | outputs = model(inputs) +---------------------+ | | | | | |
| | | | | | | | | | |
| | dist_autograd.backward(context_id, | | +----------------> parameters()| | |
| | [loss_fn(outputs, labels)]) | | | | | | | |
| | opt.step(context_id) | | | | +----------------+ | |
| +----------------------------------------------+ | | +--------------------+ |
+--------------------------------------------------+ +---------------------------+

手机如下:

[源码解析] PyTorch 分布式(18) --- 使用 RPC 的分布式管道并行

至此,PyTorch 这几篇官方示例文章都剖析完毕,从下一篇我们开始介绍弹性训练,敬请期待。

0xFF 参考

[DISTRIBUTED PIPELINE PARALLELISM USING RPC](