[源码解析] PyTorch 分布式(13) ----- DistributedDataParallel 之 反向传播

时间:2023-03-08 22:57:20
[源码解析] PyTorch 分布式(13) ----- DistributedDataParallel 之 反向传播

[源码解析] PyTorch 分布式(13) ----- DistributedDataParallel 之 反向传播

0x00 摘要

上文我们已经对Reduer的前向传播进行了分析,本文就接着来看看如何进行反向传播。

本系列其他文章如下:

深度学习利器之自动微分(1)

深度学习利器之自动微分(2)

[源码解析]深度学习利器之自动微分(3) --- 示例解读

[源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)

[源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)

[源码解析] PyTorch如何实现前向传播(3) --- 具体实现

[源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎

[源码解析] Pytorch 如何实现后向传播 (2)---- 引擎静态结构

[源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑

[源码解析] PyTorch 如何实现后向传播 (4)---- 具体算法

[源码解析] PyTorch 分布式(1)------历史和概述

[源码解析] PyTorch 分布式(2) ----- DataParallel(上)

[源码解析] PyTorch 分布式(3) ----- DataParallel(下)

[源码解析] PyTorch 分布式(4)------分布式应用基础概念

[源码解析] PyTorch分布式(5) ------ DistributedDataParallel 总述&如何使用

[源码解析] PyTorch分布式(6) ---DistributedDataParallel -- 初始化&store

[源码解析] PyTorch 分布式(7) ----- DistributedDataParallel 之进程组

[源码解析] PyTorch 分布式(8) -------- DistributedDataParallel之论文篇

[源码解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化

[源码解析] PyTorch 分布式(10)------DistributedDataParallel 之 Reducer静态架构

[源码解析] PyTorch 分布式(11) ----- DistributedDataParallel 之 构建Reducer和Join操作

[源码解析] PyTorch 分布式(12) ----- DistributedDataParallel 之 前向传播

0x01 回顾

1.1 前文回顾

前文我们已经给出了前向传播的逻辑,前向传播结束之后,我们得到了如下:

  • 需要计算梯度的参数已经分桶。
  • 桶已经重建完毕。
  • 前向传播已经完成。
  • 从指定的输出进行回溯,遍历autograd计算图来找到所有没有使用过的参数,并且一一标记为就绪 ready。

这样,DDP做梯度归并的基础就有了,它知道哪些参数不需要autograd引擎操作就能直接归并(ready状态),哪些参数可以一起通信归并(分桶),后续的事情主动权就在 PyTorch autograd 引擎了,是引擎在一边做反向计算,一边进行跨进程梯度规约

1.2 总体逻辑

我们再给出一个后向传播的总体策略如下:

Backward Pass:

  • backward()在 loss 上直接调用,这是autograd 的工作,是 DDP 无法控制的,DDP采用了Hook来达到目的。
    • DDP 在构造时注册了一个 autograd hooks。
    • Autograd 引擎进行梯度计算。
    • 当一个梯度准备好时,它在该梯度累加器上的相应 DDP 钩子将被触发。
  • 在 autograd_hook 之中进行all-reduce。假设参数index是param_index,则利用param_index获取到参数,标示为ready,如果某个桶里面梯度都ready,则该桶是ready。
  • 当一个桶中的梯度都准备好时,会 在该桶上Reducer启动异步allreduce以计算所有进程的梯度平均值。
  • 如果所有桶都ready,则等待所有 all-reduce 完成。当所有桶都准备好时,Reducer将阻塞等待所有allreduce操作完成。完成此操作后,将平均梯度写入param.grad所有参数的字段。
  • 所有进程的梯度都会reduce,更新之后,大家的模型权重都相同。所以在向后传播完成之后,跨不同DDP进程的对应的相同参数上的 grad 字段应该是相等的。
  • 梯度被归并之后,会再传输回autograd引擎。
  • 不需要像 DP 那样每次迭代之后还要广播参数。但是 Buffers 还是需要在每次迭代由 rank 0 进程广播到其他进程之上。

接下来我们就看看如何进行后向传播。

0x02 从Hook开始

下图来自快手的一篇论文(请参见参考1,后续应该也会对该论文项目进行分析)。图上半部分是原生autograd引擎处理方式,下面是 Horovod 和 Torch-DDP 的处理方式。从中可以看到,对于梯度归并是在后向传播过程中就会开始。

[源码解析] PyTorch 分布式(13) ----- DistributedDataParallel 之 反向传播

具体来说就是,除了分桶,Reducer还在构造期间注册 autograd 钩子,每个参数一个钩子。当梯度准备好时,将在向后传递期间触发这些钩子,进行梯度规约。如果某个桶里面梯度都ready,则该桶是ready。当一个桶中的梯度都准备好时,会 在该桶上Reducer启动异步allreduce以计算所有进程的梯度平均值。所以,我们就从反向传播的入口点 Hook 开始分析。

2.1 如何注册hook

我们首先看看如何注册hook,这涉及到 AutogradMeta 和 Node。

2.1.1 AutogradMeta

AutoGradMeta : 记录 Variable 的autograd历史信息,主要成员变量是。

  • grad_ :存储当前Variable实例的梯度,本身也是一个Variable。
  • grad_fn :是个Node实例,非叶子节点才有。通过 grad_fn() 方法来访问,实际上,PyTorch中就是通过 grad_fn是否为空 来判断一个Variable是否是leaf variable。
  • grad_accumulator_ :也是Node的实例,只有叶子节点才有。
    • 通过Variable的grad_accumulator()来访问。
    • 叶子节点负责对梯度进行累加,grad_accumulator_ 就是梯度累加处理函数。
    • 其对应梯度就被保存在 grad_ 变量之中。
  • output_nr_:是个数字。output_nr_表明是 Node 的第几个输出,比如为 0 就 表明这个Variable是Node 的第 1 个输出。
  • 我们总结一下:
    • 对于非叶子节点,grad_fn是计算梯度操作,梯度不会累积在 grad_ 之上,而是传递给计算图反向传播下一站。grad_fn 就是一个 Node。
    • 对于叶子节点,PyTorch 虚拟出了一个特殊计算操作,输出这个叶子节点,同时此虚拟计算操作也作为叶子节点的grad_accumulator_来累加其梯度,梯度会累积在 grad_ 之上,因此叶子节点的 output_nr_ 必定为 0。grad_accumulator_ 也是一个 Node,就是 AccumulateGrad。

其定义如下:

struct TORCH_API AutogradMeta : public c10::AutogradMetaInterface {
std::string name_; Variable grad_;
std::shared_ptr<Node> grad_fn_;
std::weak_ptr<Node> grad_accumulator_; // This field is used to store all the forward AD gradients
// associated with this AutogradMeta (and the Tensor it corresponds to)
std::shared_ptr<ForwardGrad> fw_grad_; std::vector<std::shared_ptr<FunctionPreHook>> hooks_;
std::shared_ptr<hooks_list> cpp_hooks_list_; // Only meaningful on leaf variables (must be false otherwise)
bool requires_grad_;
// Only meaningful on non-leaf variables (must be false otherwise)
bool retains_grad_;
bool is_view_; // The "output number" of this variable; e.g., if this variable
// was the second output of a function, then output_nr == 1.
// We use this to make sure we can setup the backwards trace
// correctly when this variable is passed to another function.
uint32_t output_nr_;
mutable std::mutex mutex_;
};

2.1.2 Node

在计算图中,一个计算操作用一个节点(Node)表示,不同的 Node子类实现了不同操作。

AutogradMeta 的 grad_fn_ 和 grad_accumulator_ 都是 Node。

这里针对的主要成员变量是 post_hooks_,就是在 运行梯度计算之后,会执行的 hook。

add_post_hook 会往 post_hooks_ 之中添加一个 hook。

struct TORCH_API Node : std::enable_shared_from_this<Node> {
public:
std::vector<std::unique_ptr<FunctionPreHook>> pre_hooks_;
std::vector<std::unique_ptr<FunctionPostHook>> post_hooks_; uintptr_t add_post_hook(std::unique_ptr<FunctionPostHook>&& post_hook) {
post_hooks_.push_back(std::move(post_hook));
// Use the raw pointer as the unique key to identify this hook. This key
// can then be used in del_post_hook(key) to remove this hook.
return reinterpret_cast<std::uintptr_t>(post_hooks_.back().get());
}
}

2.1.3 AccumulateGrad

AccumulateGrad 是 Node 的派生类。

2.2 构造函数

我们回顾一下 Reducer 构造函数,其中会:

  • 每个张量都得到其 Variable::AutogradMeta的 grad_accumulator_,即用于累加叶子 Variable 的梯度累加器。
  • 针对每个梯度累加器都配置一个autograd_hook,这个 hook 挂在 autograd graph 之上,在 backward 时负责梯度同步。
  • 设定 gradAccToVariableMap_ 存了grad_accumulator & index 的对应关系(函数指针和参数张量的对应关系),这样以后在 autograd graph 遍历寻找 unused parameters 就方便了。
  • 这些 梯度累加器 都存储于 grad_accumulators_ 之中。

具体代码如下:

Reducer::Reducer(
std::vector<std::vector<at::Tensor>> replicas, // 张量
std::vector<std::vector<size_t>> bucket_indices, // 桶信息
......) { for (size_t replica_index = 0; replica_index < replica_count; // 遍历replica
replica_index++) { for (size_t variable_index = 0; variable_index < variable_count; // 遍历张量
variable_index++) {
auto& variable = replicas_[replica_index][variable_index]; //得到具体的张量
const auto index = VariableIndex(replica_index, variable_index); //每个张量一个index
// 得到Variable::AutogradMeta的grad_accumulator_,即用于累加叶子 Variable 的梯度累加器
auto grad_accumulator = torch::autograd::impl::grad_accumulator(variable); hooks_.emplace_back(
// 累加器添加hook,这个 hook 挂在 autograd graph 之上,在 backward 时负责梯度同步。
// grad_accumulator 执行完后,autograd_hook 就会运行
grad_accumulator->add_post_hook(
torch::make_unique<torch::autograd::utils::LambdaPostHook>(
[=](const torch::autograd::variable_list& outputs,
const torch::autograd::variable_list& ) {
#ifndef _WIN32
this->rpc_context_.set(
ThreadLocalDistAutogradContext::getContextPtr());
#endif
this->autograd_hook(index); // 把reducer的autograd_hook函数添加进去
return outputs;
})),
grad_accumulator); // gradAccToVariableMap_ 存了grad_accumulator & index 的对应关系(函数指针和参数张量的对应关系),这样以后在 autograd graph 遍历寻找 unused parameters 就方便了
if (find_unused_parameters_) {
gradAccToVariableMap_[grad_accumulator.get()] = index;
} grad_accumulators_[replica_index][variable_index] =
std::move(grad_accumulator);
}
}
}
}

2.2.1 grad_accumulator

这里 grad_accumulator 代码如下,可以看到,就是获取张量的 autograd_meta->grad_accumulator_,然后返回,对于叶子节点,grad_accumulator_ 就是 AccumulateGrad。

std::shared_ptr<Node> grad_accumulator(const Variable& self) {
auto autograd_meta = get_autograd_meta(self); // 获取 autograd_meta
if (!autograd_meta) {
return nullptr;
}
if (autograd_meta->grad_fn_) {
throw std::logic_error(
"grad_accumulator() should be only called on leaf Variables");
}
if (!autograd_meta->requires_grad_) {
return nullptr;
} std::lock_guard<std::mutex> lock(autograd_meta->mutex_); // 获取autograd_meta->grad_accumulator_
auto result = autograd_meta->grad_accumulator_.lock();
if (result)
return result; c10::raw::intrusive_ptr::incref(self.unsafeGetTensorImpl());
auto intrusive_from_this = c10::intrusive_ptr<at::TensorImpl>::reclaim(self.unsafeGetTensorImpl());
result = std::make_shared<AccumulateGrad>(Variable(std::move(intrusive_from_this)));
autograd_meta->grad_accumulator_ = result; // 获取 autograd_meta->grad_accumulator_
return result;
}

2.2.2 图示

一个张量为 variable1,张量对应的 VariableIndex 是 index1,具体配置如下,AccumulateGrad 在使用 apply 计算完梯度之后,会调用 post_hooks 之中的 hook。

+-----------------------------------------+
| Reducer |
| |
| |
| +------------------------------------+ | +------------------+ +----------------+
| | grad_accumulators_ | | | variable1 | | AccumulateGrad |
| | | | | | | |
| | | | | | | |
| | [replica_index][variable_index]+------> | autograd_meta_+---> | post_hooks |
| | | | | | | + |
| | | | | | | | |
| +------------------------------------+ | +------------------+ +----------------+
| | |
| +-------------------------------+ | |
| | gradAccToVariableMap_ | | v
| | | |
| | | | +-----------------------+
| | [variable1 : index1] | | | autograd_hook(index1)|
| | | | +-----------------------+
| +-------------------------------+ |
| |
+-----------------------------------------+ +---------------------------------------+
index1 +--> |VariableIndex |
| |
| replica_index of Variable1 |
| |
| variable_index of Variable1 |
| |
+---------------------------------------+

2.3 Hook 函数

当梯度准备好时,引擎会回调 Hook 函数,Hook 就是如下的 autograd_hook 方法,其就是依据相关条件来设定本变量是否就绪。逻辑如下:

  • 如果是动态图&找到未用张量 或者 静态图第一次迭代,则把 local_used_maps_ 之中变量对应位置置为1。

    • local_used_maps_ 记录本地使用过的CPU张量。
    • 动态图每次迭代都可能不一致,桶和变量可能每次都不一样,所以local_used_maps_需要每次迭代都更新。
    • 静态图每次迭代都一样,只要第一次迭代时候,在回调之中设定即可。
  • 如果是静态图第一次迭代,则把 numGradHooksTriggeredMap_ 之中该变量对应之处变成1

  • 如果没有标示未使用变量,则遍历没有用到的variable,未用到的标示为ready,调用 mark_variable_ready。

  • 如果是静态图&第二次迭代之后,则 如果numGradHooksTriggeredMapPerIteration_对应递减后为0,则设定变量为就绪,调用 mark_variable_ready。

  • 否则就是动态图,动态图每次都要设定variable为就绪,调用 mark_variable_ready。

// The function `autograd_hook` is called after the gradient for a
// model parameter has been accumulated into its gradient tensor.
// This function is only to be called from the autograd thread.
void Reducer::autograd_hook(VariableIndex index) {
std::lock_guard<std::mutex> lock(this->mutex_); // Carry over thread local state from main thread. This allows for
// thread-local flags such as profiler enabled to be configure correctly.
at::ThreadLocalStateGuard g(thread_local_state_); // Ignore if we don't expect to be called.
// This may be the case if the user wants to accumulate gradients
// for number of iterations before reducing them.
if (!expect_autograd_hooks_) {
return;
} // Note [Skip allreducing local_used_maps_dev]
// ~~~~~~~~~~~~~~~~~~~~~~~~~~
// If find_unused_parameters_ is set to false, there is no need to allreduce
// local_used_maps_dev_, because all parameters will be reduced anyway.
// Therefore, we can avoid allocating memory for local_used_maps and
// local_used_maps_dev_ if find_unused_parameters_ is false. // See Note [Skip allreducing local_used_maps_dev]
// 动态图&找到未用张量 或者 静态图第一次迭代
if (dynamic_graph_find_unused() || static_graph_first_iteration()) {
// Since it gets here, this param has been used for this iteration. We want
// to mark it in local_used_maps_. During no_sync session, the same var can
// be set multiple times, which is OK as does not affect correctness. As
// long as it is used once during no_sync session, it is marked as used.
// 在 no_sync 的session之中,只要参数被用过一次,就会被标记为用过
// local_used_maps_ 记录本地使用过的CPU张量
// 动态图每次迭代都可能不一致,桶和变量可能每次都不一样,所以local_used_maps_需要每次迭代都更新
// 静态图每次迭代都一样,只要第一次迭代时候,在回调之中设定即可
local_used_maps_[index.replica_index][index.variable_index] = 1;
} if (static_graph_first_iteration()) { // 静态图第一次迭代
numGradHooksTriggeredMap_[index] += 1;// 只是静态图第一次迭代时候,会增加1
return;
} // If `find_unused_parameters_` is true there may be model parameters that
// went unused when computing the model output, they won't be part of the
// autograd graph, and won't receive gradients. These parameters are
// discovered in the `prepare_for_backward` function and their indexes stored
// in the `unused_parameters_` vector.
if (!has_marked_unused_parameters_) {
has_marked_unused_parameters_ = true;
for (const auto& unused_index : unused_parameters_) { // 遍历没有用到的variable
mark_variable_ready(unused_index); //未用到的当然就标示为ready了
}
} // If it is static graph, after 1st iteration, check a avariable
// is ready for communication based on numGradHooksTriggeredMap_.
if (static_graph_after_first_iteration()) {// 第二次迭代之后确实用到了
// 为何从第二次迭代开始处理?因为第一次迭代,当进入到这里时候,梯度还没有准备好(就是没有经过Reducer处理过,只有经过Reducer处理过之后,才算处理好)
// 静态图时,numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_;
if (--numGradHooksTriggeredMapPerIteration_[index] == 0) {
// Finally mark variable for which this function was originally called.
mark_variable_ready(index); // 从1变成0,就是就绪了,所以设定variable为就绪
}
} else {
// Finally mark variable for which this function was originally called.
mark_variable_ready(index);// 动态图每次都要设定variable为就绪
}
}

0x03 就绪

如果在反向传播过程之中,某一个参数的 hook之中发现该变量是就绪的,则会开始调用mark_variable_ready(index),我们继续看如何处理。

大致顺序就是:处理就绪的变量,处理就绪的桶,处理使用情况,从DDP拷贝回autograd之中对应的梯度。

3.1 变量ready

3.1.1 设定就绪

mark_variable_ready 是把一个变量标示为就绪,逻辑如下。

  • 如果需要重建桶,则把index插入到需重建列表之中。

    • 重建桶会发生在如下情况:1)第一次重建存储桶。2)静态图为真或查找未使用的参数为假时。3)此反向过程需要运行allreduce。
    • 在这里,我们只需将张量及其参数索引转储到基于梯度到达顺序的重建参数和重建参数索引中,然后在finalize_backward()结束时,将基于重建参数和重建参数索引重建存储桶,然后广播和初始化存储桶。此外,我们只需要转储一个副本的张量和参数索引。
  • 找到本变量对应的副本index,找到本变量在副本中哪个位置。

  • 这个variable是被使用过的,记录下来,插入到perIterationReadyParams_。

  • 每当某个变量被标记成 ready,都要设置调用一下finalize。

  • 检查桶里的梯度是不是都ready,如果有没有pending,就是桶也ready了

  • 本模型副本pending数目减1,因为又一个张量ready了。

  • 如果本副本pending数目为0,则本桶pending数目减1。

    • 因为如果本模型副本的pending为0,则说明桶对应的模型副本pending数目应该减一。
    • 如果本桶pending为0,则使用 mark_bucket_ready 设置桶就绪。
  • 如果所有桶都ready,则会:

    • 调用all_reduce_local_used_map。
    • 调用Engine::get_default_engine().queue_callback 注册 一个callback,这个callback将在engine完成全部 backward 之后调用,后续将对使用过的variable进行规约,里面调用了finalize_backward。
void Reducer::mark_variable_ready(VariableIndex index) {
// Rebuild bucket only if 1) it is the first time to rebuild bucket 2)
// static_graph_ is true or find_unused_parameters_ is false,
// 3) this backward pass needs to run allreduce.
// Here, we just dump tensors and their parameter indices into
// rebuilt_params_ and rebuilt_param_indices_ based on gradient arriving
// order, and then at the end of finalize_backward(), buckets will be
// rebuilt based on rebuilt_params_ and rebuilt_param_indices_, and then
// will be broadcasted and initialized. Also we only need to dump tensors
// and parameter indices of one replica. if (should_rebuild_buckets()) {
push_rebuilt_params(index); // 如果需要重建,就把index插入到需重建列表之中
} const auto replica_index = index.replica_index; // 找到副本index
const auto variable_index = index.variable_index; // 找到在副本中哪个位置 if (replica_index == 0) {
checkAndRaiseMarkedTwiceError(variable_index);
perIterationReadyParams_.insert(variable_index); // 这个variable是被使用过的,记录下来
}
backward_stats_[replica_index][variable_index] =
current_time_in_nanos() - cpu_timer_.backward_compute_start_time; // Any time we mark a variable ready (be it in line due to unused parameters,
// or via an autograd hook), we require a call to the finalize function. If
// this doesn't happen before the next iteration (or call to
// `prepare_for_backwards`), we know something is wrong.
require_finalize_ = true; // 每当某个变量被标记成 ready,都要调用一下 finalize const auto& bucket_index = variable_locators_[variable_index]; // 找到variable的index信息
auto& bucket = buckets_[bucket_index.bucket_index]; // 找到variable位于哪个桶
auto& replica = bucket.replicas[replica_index]; // 找到副本 set_divide_factor(); if (bucket.expect_sparse_gradient) {
mark_variable_ready_sparse(index); // sparse variable
} else {
mark_variable_ready_dense(index); // dense variable
} // TODO(@pietern): Make this work for both CPU/CUDA tensors.
// When using CPU tensors we don't need to do this.
// // Record event so that we can wait for all of them.
// auto& event = replica.events[bucket_index.intra_bucket_index];
// event.record(); // Check if this was the final gradient for this bucket.
// 检查桶里的梯度是不是都ready,如果有没有pending,就是桶也ready了
if (--replica.pending == 0) { // 减去本模型副本pending数目,因为又一个张量ready了
// Kick off reduction if all replicas for this bucket are ready.
if (--bucket.pending == 0) {// 如果本模型副本的pending为0,则说明桶对应的模型副本pending数目应该减一
mark_bucket_ready(bucket_index.bucket_index); // 那么就设置桶就绪
}
} // Run finalizer function and kick off reduction for local_used_maps once the
// final bucket was marked ready.
if (next_bucket_ == buckets_.size()) { // 如果所有桶都ready if (dynamic_graph_find_unused()) {
all_reduce_local_used_map(); // 对使用过的variable进行规约
} // The autograd engine uses the default stream when running callbacks, so we
// pass in the current CUDA stream in case it is not the default.
const c10::Stream currentStream = get_current_stream();
// 这里会注册 finalize_backward 到 engine
torch::autograd::Engine::get_default_engine().queue_callback([=] { std::lock_guard<std::mutex> lock(this->mutex_);
// Run callback with the current stream
c10::OptionalStreamGuard currentStreamGuard{currentStream};
if (should_collect_runtime_stats()) {
record_backward_compute_end_time();
}
// Check that all buckets were completed and had their work kicked off.
TORCH_INTERNAL_ASSERT(next_bucket_ == buckets_.size());
this->finalize_backward();
});
}
}

逻辑如下:

  1. Reduer 会注册autograd_hook到AccumulateGrad的post_hooks之上。
  2. Autograd Engine 在反向传播过程中,如果发现某个参数ready,就调用autograd_hook。
  3. autograd_hook 之中继续处理。
  4. 会注册一个 finalize_backward到 engine。
Engine        AccumulateGrad                Reducer

  +                  +                         +
| | |
| | 1 |
| | <-----------------------v
| |
| |
| |
| v 2
| post_hooks +--------> autograd_hook
| +
| |
| | 3
| v
| +------------------+---------------------------+
| | mark_variable_ready |
| | |
| | |
| | All variable in replica are ready? |
| | + |
| | | YES |
| | v |
| | All replica in bucket are ready? |
| | + |
| | | YES |
| | v |
| | mark_bucket_ready |
| | |
| | |
| | |
| | + |
| | | |
| | | |
| | v |
| | All buckets are ready? |
| | + |
| | | YES |
| | v |
| queue_back 4 | all_reduce_local_used_map |
| <----------------------------+ queue_callback(finalize_backward) |
| | |
| | |
v +----------------------------------------------+

3.1.2 注册callback

上面代码之中,使用了 torch::autograd::Engine::get_default_engine().queue_callback 来注册了一个回调函数。我们就来分析一下。

在engine之中有定义,就是往 final_callbacks_ 插入callback:

void Engine::queue_callback(std::function<void()> callback) {
std::lock_guard<std::mutex> lock(current_graph_task->final_callbacks_lock_);
current_graph_task->final_callbacks_.emplace_back(std::move(callback));
}

对于 final_callbacks_ 处理,在 exec_post_processing 之中,就是当 engine 全部完成 backward 的时候会调用 callback。

void GraphTask::exec_post_processing() {
if (!not_ready_.empty()) {
throw std::runtime_error("could not compute gradients for some functions");
} // set the thread_local current_graph_task_ as more callbacks can be installed
// by existing final callbacks.
GraphTaskGuard guard(shared_from_this());
// Lock mutex during each iteration for accessing final_callbacks.size()
// Unlocking is necessary, because the callback can register
// more callbacks (or they can be registered from other threads
// while it's waiting.
std::unique_lock<std::mutex> cb_lock(final_callbacks_lock_);
// WARNING: Don't use a range-for loop here because more callbacks may be
// added in between callback calls, so iterators may become invalidated.
// NOLINTNEXTLINE(modernize-loop-convert)
for (size_t i = 0; i < final_callbacks_.size(); ++i) {
cb_lock.unlock();
final_callbacks_[i](); // 调用了callback
cb_lock.lock();
} // Syncs leaf streams with default streams (if necessary)
// See note "Streaming backwards"
for (const auto& leaf_stream : leaf_streams) {
const auto guard = c10::impl::VirtualGuardImpl{c10::DeviceType::CUDA};
const auto default_stream = guard.getDefaultStream(leaf_stream.device());
if (leaf_stream != default_stream) {
auto event = c10::Event{c10::DeviceType::CUDA};
event.record(leaf_stream);
default_stream.wait(event);
}
}
}

于是逻辑拓展如下:

  1. Reduer 会注册autograd_hook到AccumulateGrad的post_hooks之上。
  2. Autograd Engine 在反向传播过程中,如果发现某个参数ready,就调用autograd_hook。
  3. autograd_hook 之中继续处理。
  4. 会注册一个 finalize_backward到 engine。
  5. 在 GraphTask::exec_post_processing 之中会调用 finalize_backward。
          Engine        AccumulateGrad                Reducer

            +                  +                         +
| | |
| | 1 |
| | <-----------------------+
| |
| |
| |
| v
| 2
| post_hooks +--------> autograd_hook
| +
| |
| | 3
| v
| +------------------+---------------------------+
| | mark_variable_ready |
| | |
| | |
| | All variable in replica are ready? |
| | + |
| | | YES |
| | v |
| | All replica in bucket are ready? |
| | + |
| | | YES |
| | v |
| | mark_bucket_ready |
| | |
| | |
| | |
| | + |
| | | |
| | | |
| | v |
| | All buckets are ready? |
| | + |
| | | YES |
| | v |
| queue_back 4 | all_reduce_local_used_map |
| <----------------------------+ queue_callback(finalize_backward) |
| | |
| | |
| +-------------------+--------------------------+
v |
|
GraphTask::exec_post_processing |
+ |
| |
| 5 v
+---------------------------------> finalize_backward
| +
| |
| |
v v

3.1.3 mark_variable_ready_sparse

mark_variable_ready_sparse 函数用来处理sparse类型的variable,其实就是拷贝梯度到Reducer。

void Reducer::mark_variable_ready_sparse(VariableIndex index) {
const auto replica_index = index.replica_index;
const auto variable_index = index.variable_index;
const auto& bucket_index = variable_locators_[variable_index];
auto& bucket = buckets_[bucket_index.bucket_index]; // 哪个桶
auto& replica = bucket.replicas[replica_index]; // 桶的哪个副本
auto& variable = replica.variables[bucket_index.intra_bucket_index]; // 副本之中哪个variable runGradCallbackForVariable(variable, [&](auto& grad) {
TORCH_CHECK(grad.defined(), "Expected sparse gradient to be defined.");
TORCH_CHECK(
grad.options().layout() == c10::kSparse,
"Expected variable to have sparse gradient."); // Sparse tensors cannot be grouped together with other sparse tensors
// in a single reduction operation like we can for dense tensors.
// Therefore, the `offsets` and `lengths` vectors in the bucket replica
// struct are empty, and there is no pre-existing accumulation tensor.
// Directly assign the sparse tensor to the `contents` field.
replica.contents = grad; //直接拷贝
// See Note [DDP Communication Hook]
if (comm_hook_ == nullptr) {
replica.contents.div_(divFactor_);
}
// The grad is modified in place and needs to be written back.
return true;
});
}

3.1.4 mark_variable_ready_dense

mark_variable_ready_dense 会处理 dense tensors,其实就是拷贝梯度到Reducer。

我们首先看一个成员变量:gradient_as_bucket_view_,其:

  • 如果为false,在 allreduce 桶之后,需要把桶拷贝回grads。

  • 当设置为“True”时,梯度将是指向“allreduce”的不同偏移的视图。这可以减少峰值内存使用,其中保存的内存大小将等于梯度总大小。此外,它还避免了在梯度和“allreduce”通信桶之间进行复制的开销。当梯度为视图时,不能对梯度调用detach_()

mark_variable_ready_dense 逻辑为:

  • 依据index找到本变量属于哪个桶,哪个副本,然后得到副本中的张量variable,进而得到variable的offset和size。最终得到张量对应的 bucket_view。
  • 使用 runGradCallbackForVariable 对张量进行处理。runGradCallbackForVariable 其实是使用 DistAutogradContext 处理callback,最后传回 DistAutogradContext。
  • callback 内部执行逻辑是:
    • 当 gradient_as_bucket_view_ 为false时,或者即使gradient_as_bucket_view_为true时,在极少数情况下,用户可以在每次迭代后将grad设置为None。
    • 在这些情况下,grad和bucket_view指向不同的存储,因此需要将grad复制到bucket_view。
    • 如果 gradient_as_bucket_view_ 设置为true,则让 grad 指向 bucket_view。
    • 如果 grad 在以前的迭代中已经被设置为bucket_view,则不需要复制。
void Reducer::mark_variable_ready_dense(VariableIndex index) {
const auto replica_index = index.replica_index;
const auto variable_index = index.variable_index;
const auto& bucket_index = variable_locators_[variable_index];
auto& bucket = buckets_[bucket_index.bucket_index]; // 哪个桶
auto& replica = bucket.replicas[replica_index]; // 桶的哪个副本
auto& variable = replica.variables[bucket_index.intra_bucket_index]; // 得到副本中的variable
const auto offset = replica.offsets[bucket_index.intra_bucket_index]; // variable的offset
const auto length = replica.lengths[bucket_index.intra_bucket_index]; // variable的size
auto& bucket_view = replica.bucket_views_in[bucket_index.intra_bucket_index]; //插入view // Copy contents of gradient tensor to bucket tensor.
// If the gradient is not set, we assume it wasn't computed
// as part of the current backwards pass, and zero the part
// of the bucket it would otherwise hold.
runGradCallbackForVariable(variable, [&](auto& grad) {
// 拿到张量对应的梯度 grad
if (grad.defined()) {
this->check_grad_layout(grad, bucket_view);
// When gradient_as_bucket_view_ is false, or even when
// gradient_as_bucket_view_ is true, in rare cases users may set grad to
// be None after every iteration. In these cases, grad and bucket_view are
// pointing to different storages and thus need to copy grads to
// bucket_view. If gradient_as_bucket_view_ is set as true, let grad point
// to bucket_view. If grad has already been set as views of buckets in
// previous iterations, no copy is needed.
if (!grad.is_alias_of(bucket_view)) {
this->copy_grad_to_bucket(grad, bucket_view); // 把梯度拷贝进入contents
if (gradient_as_bucket_view_) {
// Let grad point to bucket_view buffer.
grad = bucket_view; // 为了省内存,grad指向了bucket_view
// The grad is modified and need to be written back.
return true;
}
} else {
// If grad and bucket view point to the same storage, no need to copy
if (comm_hook_ == nullptr) {
bucket_view.div_(divFactor_);
}
}
} else {
bucket_view.zero_(); // 设置为0
}
// The grad is not modified and doesn't need to be written back.
return false;
});
}

copy_grad_to_bucket的作用是把梯度拷贝到 contents

void Reducer::copy_grad_to_bucket(
const at::Tensor& grad,
at::Tensor& bucket_view) {
// See Note [DDP Communication Hook]
if (comm_hook_ == nullptr) {
auto wrapped = at::native::wrapped_scalar_tensor(double(1.) / divFactor_);
// Divides while copying into the bucket view.
at::mul_out(bucket_view, grad, wrapped);
} else {
bucket_view.copy_(grad); // 通过bucket_view把梯度拷贝到 桶副本的contents
}
}

3.2 桶ready

前面代码中有,检查桶里的梯度是不是都ready,如果有没有pending,就是桶也ready了,这时候就调用 mark_bucket_ready。

mark_bucket_ready 之中会遍历桶,对于就绪的桶进行规约。

// Called when the bucket at the specified index is ready to be reduced.
void Reducer::mark_bucket_ready(size_t bucket_index) {
TORCH_INTERNAL_ASSERT(bucket_index >= next_bucket_); // Buckets are reduced in sequence. Ignore this bucket if
// it's not its turn to be reduced.
if (bucket_index > next_bucket_) {
return;
} // Keep going, until we either:
// - have kicked off reduction for all buckets, or
// - found a bucket that's not yet ready for reduction.
// // 遍历桶,直到遇到下面两种情况:
// - 已经发起了对所有桶的规约
// - 发现一个桶其实没有就绪
for (; next_bucket_ < buckets_.size() && buckets_[next_bucket_].pending == 0;
next_bucket_++) {
num_buckets_ready_++; // 增加
if (num_buckets_ready_ == 1 && should_collect_runtime_stats()) {
record_backward_comm_start_time();
}
auto& bucket = buckets_[next_bucket_];
all_reduce_bucket(bucket); // 对于就绪的桶,进行规约
}
}

3.2.1 all_reduce_bucket

all_reduce_bucket 是对于 contents 进行同步。

  • 遍历桶的副本,把副本张量插入到 tensors。
  • 如果没注册 comm_hook,直接 allreduce 这些tensors。
  • 注册了 comm_hook 那就使用 hook 进行allreduce,需要注意的是,这个comm_hook 只是处理通信的底层hook,如果想在 reduce 前分别进行梯度裁剪,还是需要在 autograph 挂 hook。
void Reducer::all_reduce_bucket(Bucket& bucket) {
std::vector<at::Tensor> tensors;
tensors.reserve(bucket.replicas.size());
for (const auto& replica : bucket.replicas) {
// TODO(@pietern): Ensure proper synchronization with the CUDA events
// that recorded copies into this contents tensor. If these copies are
// executed on non-default streams, the current stream for the device
// that holds the contents tensor must wait on these events.
//
// As long as autograd uses the default stream for every device,
// these operations are implicitly sequenced, and we don't need to
// do any extra synchronization here.
//
// CUDA default stream 都按时序排好了
tensors.push_back(replica.contents);
}
// See Note [DDP Communication Hook]
if (comm_hook_ == nullptr) {
// 如果没注册 comm_hook,直接 allreduce
bucket.work = process_group_->allreduce(tensors);
} else {
// 注册了 comm_hook 那就使用 hook 进行allreduce
// 需要注意的是,这个comm_hook 只是处理通信的底层hook,如果想在 reduce 前分别进行梯度裁剪,还是需要在 autograph 挂 hook GradBucket grad_bucket(
next_bucket_,
tensors[0], // 从下面注解可以知道,一个桶只有一个replica
// Since currently we do not support single-process multiple-device
// mode, we can assume only one replica in the bucket.
bucket.replicas[0].offsets,
bucket.replicas[0].lengths,
bucket.replicas[0].sizes_vec);
bucket.future_work = comm_hook_->runHook(grad_bucket);
}
}

逻辑拓展如下:

  1. Reduer 会注册autograd_hook到AccumulateGrad的post_hooks之上。
  2. Autograd Engine 在反向传播过程中,如果发现某个参数ready,就调用autograd_hook。
  3. autograd_hook 之中继续处理。
  4. 调用all_reduce_bucket进行同步梯度。
  5. 会注册一个 finalize_backward到 engine。
  6. 在 GraphTask::exec_post_processing 之中会调用 finalize_backward。
                                                                             +
Worker 1 | Worker 2
|
Engine AccumulateGrad Reducer | Reducer
|
+ + + | +
| | | | |
| | 1 | | |
| | <-----------------------+ | |
| | | |
| | | |
| v | |
| 2 | |
| post_hooks +--------> autograd_hook | |
| + | |
| | | |
| | 3 | |
| v | |
| +------------------+---------------------------+ | |
| | mark_variable_ready | | |
| | | | |
| | | | |
| | All variable in replica are ready? | | |
| | + | | |
| | | YES | | |
| | v | | |
| | All replica in bucket are ready? | | |
| | + + + |
| | | YES |
| | v 4 all_reduce_bucket |
| | mark_bucket_ready <--------------+---+-----> |
| | | | |
| | | | |
| | | | |
| | + | | |
| | | | | |
| | | | | |
| | v | | |
| | All buckets are ready? | | |
| | + | | |
| | | YES | | |
| | v | | |
| queue_back 5 | all_reduce_local_used_map | | |
| <------------------------+ queue_callback(finalize_backward) | | |
| | | | |
| | | | |
| +-------------------+--------------------------+ | |
v | | |
| | |
GraphTask::exec_post_processing | | |
+ | | |
| | | |
| v | |
+-----------------------------> finalize_backward | |
| 6 + | |
| | | |
| | | |
v v + v

3.2.2 PythonCommHook

PythonCommHook 用来实现用户的特殊需求,我们前文提到过,这里再给出两个例子。

PythonCommHook 举例

c10::intrusive_ptr<c10::ivalue::Future> PythonCommHook::runHook(
GradBucket& bucket) {
py::gil_scoped_acquire acquire; py::object py_fut = hook_(state_, bucket); try {
return py_fut.cast<std::shared_ptr<torch::jit::PythonFutureWrapper>>()->fut;
} catch (const py::cast_error& e) {
auto type = py_fut.get_type();
auto errMsg = c10::str(
e.what(),
". DDP communication hook's callback must return a "
"torch.futures.Future or torch._C.Future object, but got ",
type.attr("__module__").cast<std::string>(),
".",
type.attr("__qualname__").cast<std::string>());
throw std::runtime_error(errMsg);
}
}

或者

c10::intrusive_ptr<c10::ivalue::Future> AllReduceCommHook::runHook(
GradBucket& bucket) {
std::vector<at::Tensor> tensors = {bucket.getTensorRef()};
auto allreduce_work = state_->allreduce(tensors); // FIXME Access the result through the Future passed as argument, instead of
// capturing the Work.
auto div_by_process_group_size = [allreduce_work,
this](c10::ivalue::Future& /* unused */) {
auto tensor = allreduce_work->result()[0] / state_->getSize();
return c10::IValue(tensor);
}; auto fut = allreduce_work->getFuture();
return fut->then(div_by_process_group_size, fut->elementType());
}

3.2.3 GradBucket

GradBucket 是用来拷贝信息的类。

// This class passes bucket contents tensor to DDP communication hook.
class GradBucket {
public:
explicit GradBucket(
size_t index,
const at::Tensor& tensor,
const std::vector<size_t>& offsets,
const std::vector<size_t>& lengths,
const std::vector<c10::IntArrayRef>& sizes_vec)
: index_(index),
tensor_(tensor),
offsets_(offsets),
lengths_(lengths),
sizes_vec_(sizes_vec) {} // Returns the index of the bucket, which is unique across all the buckets.
size_t getIndex() const {
return index_;
} const at::Tensor& getTensor() const {
return tensor_;
} // Returns a mutable tensor compared with the above method.
at::Tensor& getTensorRef() {
return tensor_;
} // Overwrites tensors at a specific index.
void setTensor(at::Tensor& tensor) {
tensor_ = tensor;
} // Each tensor in the list that getPerParameterTensors corresponds to a
// parameter.
std::vector<at::Tensor> getPerParameterTensors() const; // Returns whther this bucket is the last bucket to allreduce in an iteration.
bool isTheLastBucketToAllreduce() const {
return index_ == 0;
} private:
size_t index_;
at::Tensor tensor_; // Per-variable info in tensors_[0].
std::vector<size_t> offsets_;
std::vector<size_t> lengths_;
std::vector<c10::IntArrayRef> sizes_vec_;
};

3.3 all_reduce_local_used_map

注意,这里是对张量使用情况这个local_used_maps_变量进行规约,不是张量的梯度进行规约。

3.3.1 定义

我们回忆下定义。

以下两个变量用来记录本地使用过的参数,其标示在未启用同步的情况下(no_sync is on),在当前迭代或者 no_sync session 之中,这些参数是否在本地被使用过。

每个模型副本对应map中的一个张量,每个张量是参数数量的一维int32(one-dim int32)张量。

这些张量在autograd_hook中标记,以指示已使用了相应的参数。这些张量会在当前迭代或无同步会话(no_sync session)的后向传播结束时进行allreduce,以计算出全局未使用的参数。

// Locally used parameter maps indicating if parameters are used locally
// during the current iteration or no_sync session if no_sync is on. One
// tensor for each model replica and each tensor is one-dim int32 tensor of
// number of parameters. These tensors are marked in autograd_hook to indicate
// the corresponding param has been used, and get allreduced in the end of
// backward of current iteration or no_sync session for figuring out the
// globally unused parameters.
//
// local_used_maps_: CPU tensors for bookkeeping locally used params
// local_used_maps_dev_: dev tensors for reducing globally unused params
std::vector<at::Tensor> local_used_maps_; // autograd_hook中会设置,对应论文中的
std::vector<at::Tensor> local_used_maps_dev_; // GPU

3.3.2 同步

all_reduce_local_used_map 这里使用了异步 H2D 来避免阻塞开销。即把 local_used_maps_ 拷贝到 local_used_maps_dev_,然后对 local_used_maps_dev_ 进行规约。

void Reducer::all_reduce_local_used_map() {
// See Note [Skip allreducing local_used_maps_dev]
// H2D from local_used_maps_ to local_used_maps_dev_
for (size_t i = 0; i < local_used_maps_.size(); i++) {
if (local_used_maps_dev_[i].is_cuda()) {
// Note [local_used_maps_ -> local_used_maps_dev copying]
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// We do async H2D to avoid the blocking overhead. The async copy and
// allreduce respect the current stream, so will be sequenced
// correctly.
//
// Correct sequencing with respect to host operations is also
// essential. The H2D copy_ is stream ordered, while the host's
// changes to local_used_maps_ are host ordered. If a large backlog of
// cuda-stream work pushes the copy_ far into the future, and if no
// blocking calls occur between now and finalize_backward()** such
// that finalize_backward() re-zeroes local_used_maps_ on the host
// before the stream executes the copy_, copy_ will read those zeros
// instead of the values we thought we told it to read here. Copying
// local_used_maps_[i] to a pinned temporary (which the pinned caching
// allocator should supply asynchronously) avoids this nasty, rare
// race condition.
//
// ** In the hoped-for case where all params are used, DDP itself
// won't do any blocking work between now and the re-zeroing, so the
// danger is real.
//
// Defensively ensures local_used_maps_tmp is distinct from
// local_used_maps_[i]
auto local_used_maps_tmp = at::native::empty_like(
local_used_maps_[i],
optTypeMetaToScalarType(local_used_maps_[i].options().dtype_opt()),
local_used_maps_[i].options().layout_opt(),
local_used_maps_[i].options().device_opt(),
true /* pinned_memory */);
// Paranoid asserts here because in some workloads, the pinned
// allocator behaves in a way we don't understand, and may be bugged.
// See https://github.com/pytorch/pytorch/pull/54474
TORCH_INTERNAL_ASSERT(local_used_maps_tmp.is_pinned());
TORCH_INTERNAL_ASSERT(
local_used_maps_tmp.data_ptr() != local_used_maps_[i].data_ptr());
local_used_maps_tmp.copy_(local_used_maps_[i]);
local_used_maps_dev_[i].copy_(local_used_maps_tmp, true);
} else {
local_used_maps_dev_[i].copy_(local_used_maps_[i], true);
}
}
local_used_work_ = process_group_->allreduce(local_used_maps_dev_);
}

拓展如下:

  1. Reduer 会注册autograd_hook到AccumulateGrad的post_hooks之上。
  2. Autograd Engine 在反向传播过程中,如果发现某个参数ready,就调用autograd_hook。
  3. autograd_hook 之中继续处理。
  4. 调用all_reduce_bucket进行同步梯度。
  5. 调用 allreduce 对 local_used_maps_变量进行规约。
  6. 会注册一个 finalize_backward到 engine。
  7. 在 GraphTask::exec_post_processing 之中会调用 finalize_backward。
                                                                             +
Worker 1 | Worker 2
|
Engine AccumulateGrad Reducer | Reducer
|
+ + + | +
| | | | |
| | 1 | | |
| | <-----------------------+ | |
| | | |
| | | |
| | | |
| | | |
| v | |
| 2 | |
| post_hooks +--------> autograd_hook | |
| + | |
| | | |
| | 3 | |
| v | |
| +------------------+---------------------------+ | |
| | mark_variable_ready | | |
| | | | |
| | | | |
| | All variable in replica are ready? | | |
| | + | | |
| | | YES | | |
| | v | | |
| | All replica in bucket are ready? | | |
| | + + + |
| | | YES 4 all_reduce_bucket |
| | v |
| | mark_bucket_ready <--------------+---+-----> |
| | | | |
| | | | |
| | | | |
| | + | | |
| | | | | |
| | | | | |
| | v | | |
| | All buckets are ready? | | |
| | + | | |
| | | YES + + |
| | v 5 allreduce |
| 6 queue_back | all_reduce_local_used_map <--------+---+-----> |
| <------------------------+ queue_callback(finalize_backward) | | |
| | | | |
| | | | |
| +-------------------+--------------------------+ | |
v | | |
| | |
GraphTask::exec_post_processing | | |
+ | | |
| | | |
| v | |
+-----------------------------> finalize_backward | |
| 7 + | |
| | | |
| | | |
v v + v

3.4 finalize_backward

finalize_backward 完成了收尾工作,逻辑为:

  • 遍历桶,对于每个桶:
    • 等待同步张量完成。
    • 从future结果拷贝回contents。
  • 等待 local_used_maps_dev 同步完成。
void Reducer::finalize_backward() {
// No longer expect autograd hooks to fire after this function returns.
expect_autograd_hooks_ = false;
// No longer require call to finalize after this function returns.
require_finalize_ = false; // Unset allreduce division factor, as it may change in next backwards pass
// when running with DDP join mode.
divFactor_ = kUnsetDivFactor; // Wait for asynchronous reduction to complete and unflatten contents.
for (auto& bucket : buckets_) { // 遍历桶
// See Note [DDP Communication Hook]
if (comm_hook_ == nullptr) {
bucket.work->wait(); // 等待同步完成
} else {
bucket.future_work->wait(); // 等待同步完成 auto future_result =
comm_hook_->parseHookResult(bucket.future_work->value()); for (size_t i = 0; i < future_result.size(); i++) { //
auto& replica = bucket.replicas[i];
if (bucket.expect_sparse_gradient) {
replica.contents.copy_(future_result[i]); // 从future结果拷贝回contents
} else {
// Reinitialize only `bucket_views_out` with the future_result by
// following the same logic in `initialize_buckets`.
// 把 future_result[i] 解析到 bucket_views_out 之中
populate_bucket_views_out(replica, future_result[i]);
}
}
}
if (!bucket.expect_sparse_gradient) {
// We don't need to finalize the sparse bucket since the sparse grad and
// the bucket essentially point to the same storage. As a result, once
// the allreduce is done, the sparse grads are automatically updated.
finalize_bucket_dense(bucket); //
}
} // See Note [Skip allreducing local_used_maps_dev]
if (dynamic_graph_find_unused() || static_graph_first_iteration()) {
// Due to the lazy wait, it is possible that reduction of the current
// iteration is still going when the one for next iteration gets kicked off.
// For such case, we want to wait explicitly to make sure the reduction does
// complete before kicking off next one. Otherwise the previous one may
// interfere, write to the device-side memory and clobber the content of
// local_unused_maps_dev_.
if (!local_used_maps_reduced_) {
local_used_work_->wait(); // 等待 local_used_maps_dev 同步完成
}
} if (dynamic_graph_find_unused()) {
// Reset unused parameter accounting.
// See Note [local_used_maps_ -> local_used_maps_dev copying]
for (auto& local_used : local_used_maps_) {
local_used.fill_(0);
}
local_used_maps_reduced_ = false;
} if (should_collect_runtime_stats()) {
record_backward_comm_end_time();
}
}

这个过程会用到如下函数。

4.6.1 populate_bucket_views_out

populate_bucket_views_out 从contents构建输出view

// (see Note:  "Gradient Layout Contract" in initialize_buckets).
void Reducer::populate_bucket_views_out(
Reducer::BucketReplica& replica,
at::Tensor& tensor) { // 把tensor解析到 bucket_views_out 之中
replica.bucket_views_out.clear(); // 清空
for (size_t i = 0; i < replica.variables.size(); i++) { // 重新初始化 bucket_views_out
const auto& v = replica.variables[i]; // 遍历副本的张量
const auto offset = replica.offsets[i];
const auto length = replica.lengths[i];
if (v.is_non_overlapping_and_dense()) {
// If the param's memory is dense, match its layout, anticipating
// the autograd engine (AccumulateGrad) will also create gradients
// matching its layout.
replica.bucket_views_out.push_back( // 把tensor解析到 bucket_views_out 之中
tensor.as_strided(v.sizes(), v.strides(), offset));
} else {
// Fall back to a C-style contiguous view, again anticipating
// AccumulateGrad will do the same when stashing grads for non-dense
// params.
replica.bucket_views_out.push_back( // 把tensor解析到 bucket_views_out 之中
tensor.narrow(0, offset, length).view(v.sizes()));
}
}
}

4.6.1 finalize_bucket_dense

finalize_bucket_dense 作用是调用 runGradCallbackForVariable 或者 copy_bucket_to_grad 把规约好的梯度拷贝会引擎。

// A bucket with one or more dense tensors needs to be unflattened.
void Reducer::finalize_bucket_dense(Bucket& bucket) {
for (size_t replica_index = 0; replica_index < bucket.replicas.size();
replica_index++) {
auto& replica = bucket.replicas[replica_index];
for (size_t intra_bucket_index = 0;
intra_bucket_index < replica.variables.size();
intra_bucket_index++) {
auto& variable = replica.variables[intra_bucket_index];
const auto offset = replica.offsets[intra_bucket_index];
const auto length = replica.lengths[intra_bucket_index]; bool global_unused = false;
// See Note [Skip allreducing local_used_maps_dev]
if (static_graph_ || find_unused_parameters_) {
// Determine if this param has been used globally or not.
//
// If the variable was used locally, it is also used globally and then
// we don't need to wait for the reduction. Otherwise we lazily wait for
// the reduction to complete, only when we see a variable that was
// unused locally. Then we end up delaying the synchronization point
// that local_used_work_->wait() implies. If we don't have any unused
// parameters at all, we can skip waiting for the work to complete
// altogether, and cause negligible performance overhead for models
// where all parameters are used. Such lazily waiting means minimizing
// performance impact for the big majority of models where all
// parameters are always used. Then we only pay the overhead cost if
// there is indeed a parameter that is locally unused, because we need
// to check if it's also globally unused.
size_t variable_index = bucket.variable_indices[intra_bucket_index];
// Note: global_unused might not be global yet. As we lazily wait for
// the reduction to complete, it becomes really global only if we get to
// the point as below where we wait for the reduction work, make D2H
// copy, and update global_unused with the real global consensus, i.e.
// local_used_maps_reduced_ is true.
global_unused =
local_used_maps_[replica_index][variable_index].item<int>() == 0;
if (global_unused && !local_used_maps_reduced_) {
// Wait for local_used_maps reduction to complete.
local_used_work_->wait();
// D2H from local_used_maps_dev_ to local_used_maps_
for (size_t i = 0; i < local_used_maps_.size(); i++) {
// Blocking copy, if local_used_maps_dev_ is cuda
local_used_maps_[i].copy_(local_used_maps_dev_[i]);
}
global_unused =
local_used_maps_[replica_index][variable_index].item<int>() == 0;
local_used_maps_reduced_ = true;
}
} if (!gradient_as_bucket_view_) {
copy_bucket_to_grad( // 拷贝回 dist.context 去
variable, replica, intra_bucket_index, global_unused);
} else {
const auto& bucket_view_out =
replica.bucket_views_out[intra_bucket_index];
auto& bucket_view_in = replica.bucket_views_in[intra_bucket_index];
// If communication_hook is registered, bucket_view_out stores
// allreduced results in a newly allocated tensor, copy bucket_view_out
// back to bucket_view_in that referring to replica.content tensor and
// grad.
if (!bucket_view_in.is_alias_of(bucket_view_out)) {
bucket_view_in.copy_(bucket_view_out); // 从out拷贝回in view
}
runGradCallbackForVariable(variable, [&](auto& grad) {
// If a parameter is globally unused, we keep its grad untouched.
if (!global_unused) {
// If grad is globally used but locally unused, let grad point to
// bucket_view_in
if (!grad.defined()) {
grad = bucket_view_in;
} else {
if (!grad.is_alias_of(bucket_view_in)) {
TORCH_CHECK(
false,
"Detected at least one parameter gradient is not the "
"expected DDP bucket view with gradient_as_bucket_view=True. "
"This may happen (for example) if multiple allreduce hooks "
"were registered onto the same parameter. If you hit this error, "
"please file an issue with a minimal repro.");
}
}
// The grad is modified and needs to be written back.
return true;
}
// The grad is not modified.
return false;
});
}
}
}
}

4.6.3 copy_bucket_to_grad

这里是从桶拷贝回autograd engine之中对应的梯度。

void Reducer::copy_bucket_to_grad(
at::Tensor& variable,
Reducer::BucketReplica& replica,
size_t intra_bucket_index,
bool global_unused) {
const auto& bucket_view = replica.bucket_views_out[intra_bucket_index]; // 拿到输出view
runGradCallbackForVariable(variable, [&](auto& grad) {
// If a parameter is globally unused, we keep its grad untouched.
if (!global_unused) {
if (!grad.defined()) {
// Creates grad according to the "Gradient Layout Contract"
// (see torch/csrc/grad/AccumulateGrad.h)
grad =
torch::autograd::utils::clone_obey_contract(bucket_view, variable);
} else {
grad.copy_(bucket_view); // 从桶拷贝回梯度
}
// The grad is modified and needs to be written back.
return true;
}
// The grad is not modified.
return false;
});
}

至此,我们拓展如下:

  1. Reduer 会注册autograd_hook到AccumulateGrad的post_hooks之上。
  2. Autograd Engine 在反向传播过程中,如果发现某个参数ready,就调用autograd_hook。
  3. autograd_hook 之中继续处理。
  4. 调用all_reduce_bucket进行同步梯度。
  5. 调用 allreduce 对 local_used_maps_变量进行规约。
  6. 会注册一个 finalize_backward到 engine。
  7. 在 GraphTask::exec_post_processing 之中会调用 finalize_backward。
  8. 调用 wait 于其他 worker 同步。
  9. 调用 copy_bucket_to_grad 从桶拷贝回autograd引擎对应的梯度。

因此,我们就知道了一个在反向传播过程之中,autograd 引擎如何与DDP交互,如何一边做反向计算,一边利用DDP归并梯度的完整过程。

                                                                             +
Worker 1 | Worker 2
|
Engine AccumulateGrad Reducer | Reducer
|
+ + + | +
| | | | |
| | 1 | | |
| | <----------------------+ | |
| | | |
| | | |
| v | |
| 2 | |
| post_hooks +--------> autograd_hook | |
| + | |
| | | |
| | 3 | |
| v | |
| +------------------+---------------------------+ | |
| | mark_variable_ready | | |
| | | | |
| | | | |
| | All variable in replica are ready? | | |
| | + | | |
| | | YES | | |
| | v | | |
| | All replica in bucket are ready? | | |
| | + + + |
| | | YES 4 all_reduce_bucket |
| | v |
| | mark_bucket_ready <--------------+---+-----> |
| | | | |
| | | | |
| | | | |
| | + | | |
| | | | | |
| | | | | |
| | v | | |
| | All buckets are ready? | | |
| | + | | |
| | | YES + + |
| | v 5 allreduce |
| 6 queue_back | all_reduce_local_used_map <--------+---+-----> |
| <------------------------+ queue_callback(finalize_backward) | | |
| | | | |
| | | | |
| +-------------------+--------------------------+ | |
v | | |
| | |
GraphTask::exec_post_processing | | |
+ | | |
| | | |
| 7 v | |
+-----------------------------> finalize_backward | |
| + 8 wait | |
| | <---------------------------------> |
| <-------------------------------------+ | | |
v copy_bucket_to_grad 9 v + v

至此,反向传播分析完毕,DDP 的全部分析也结束,我们接下来对分布式autograd进行分析。

0xFF 参考

BAGUA: Scaling up Distributed Learning with System Relaxations

pytorch分布式系列3——分布式训练时,torch.utils.data.distributed.DistributedSampler做了什么?

pytorch分布式系列1——搞清torch.distributed.launch相关的环境变量

pytorch分布式系列2——DistributedDataParallel是如何做同步的?

pytorch(分布式)数据并行个人实践总结——DataParallel/DistributedDataParallel

Pytorch的nn.DataParallel

https://discuss.pytorch.org/t/dataparallel-imbalanced-memory-usage/22551/20

https://pytorch.org/docs/stable/distributed.html

PyTorch 源码解读之分布式训练了解一下?

实操教程|PyTorch AutoGrad C++层实现

PYTORCH 自动微分(一)

PyTorch如何加速数据并行训练?分布式秘籍大揭秘

pytorch分布式训练(二init_process_group)

https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

https://pytorch.org/docs/master/notes/ddp.html

https://pytorch.org/tutorials/intermediate/dist_tuto.html

PyTorch 源码解读之 DP & DDP:模型并行和分布式训练解析

Pytorch模型中的parameter与buffer