Skip to content

Commit

Permalink
Bulked op segments to allow Variable nodes (apache#14200)
Browse files Browse the repository at this point in the history
* Bulked op seg size to ignore Variable nodes, limited by MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_{FWD,BWD}.

* Document new env variables. Unify operation with imperative.

* Add timing-based tests of symbol and gluon op bulking.

* Rename test_in_separate_process -> run_in_spawned_process.

* Remove redundant util test_operator_gpu.py:_test_in_separate_process().

* Consolidate references to env vars that set op-bulking policy.

* Test for effect of MXNET_EXEC_BULK_EXEC_TRAIN=0.

* Fix python2 print() issue.

* Trigger CI.

* Consolidate similar op bulking routines.

* Trigger CI.

* Trigger CI.

* Add instrumentation to debug failing CI.
  • Loading branch information
DickJC123 authored and haohuw committed Jun 23, 2019
1 parent 611fb53 commit 208e76b
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 111 deletions.
8 changes: 7 additions & 1 deletion docs/faq/env_var.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,13 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0
- If set to `1`, during training MXNet executes the computation graph as several subgraphs in bulk mode.
* MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN
- Values: Int ```(default=15)```
- The maximum number of nodes in the subgraph executed in bulk during training(not inference). Setting this to a larger number may reduce the degree of parallelism for multi-GPU training.
- The maximum number of nodes in the subgraph executed in bulk during training (not inference). Setting this to a larger number may reduce the degree of parallelism for multi-GPU training.
* MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_FWD
- Values: Int ```(default=<value of MXNET_EXEC_BULK_MAX_NODE_TRAIN>)```
- The maximum number of nodes in the subgraph executed in bulk during training (not inference) in the forward pass.
* MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_BWD
- Values: Int ```(default=<value of MXNET_EXEC_BULK_MAX_NODE_TRAIN>)```
- The maximum number of nodes in the subgraph executed in bulk during training (not inference) in the backward pass.

## Control the Data Communication

Expand Down
23 changes: 20 additions & 3 deletions include/mxnet/imperative.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,31 @@ class Imperative {
bool create_graph);
/*! \return AutogradRuntime singleton */
static Imperative* Get();
/*! \brief Should op execution bulking be employed during inference. */
static bool PreferBulkExecInference() {
return dmlc::GetEnv("MXNET_EXEC_BULK_EXEC_INFERENCE", true);
}
/*! \brief Should op execution bulking be employed during training. */
static bool PreferBulkExecTrain() {
return dmlc::GetEnv("MXNET_EXEC_BULK_EXEC_TRAIN", true);
}
/*! \brief The max number of op nodes in a bulk during forward pass of training. */
static int BulkExecMaxNodeTrainFwd() {
return dmlc::GetEnv("MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_FWD",
dmlc::GetEnv("MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN", 15));
}
/*! \brief The max number of op nodes in a bulk during backward pass of training. */
static int BulkExecMaxNodeTrainBwd() {
return dmlc::GetEnv("MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_BWD",
dmlc::GetEnv("MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN", 15));
}

private:
friend class NDArray;
/*! \brief make constructor protected. */
Imperative() {
if (dmlc::GetEnv("MXNET_EXEC_BULK_EXEC_TRAIN", 1)) {
backward_bulk_size_ = dmlc::GetEnv("MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN", 15);
}
if (PreferBulkExecTrain())
backward_bulk_size_ = BulkExecMaxNodeTrainBwd();
}
/*! \brief find the input/output ndarrays that are needed for backward */
void GetBackwardDependency(
Expand Down
106 changes: 25 additions & 81 deletions src/executor/graph_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1191,105 +1191,49 @@ void GraphExecutor::InitOpSegs() {
cached_seg_opr_.resize(total_num_nodes, p);
if (monitor_callback_) return;

// Symbolic bulking is set by the same environment variables as Imperative bulking.
// Generate segments based on the graph structure
bool prefer_bulk_exec_inference = dmlc::GetEnv("MXNET_EXEC_BULK_EXEC_INFERENCE", true);
bool prefer_bulk_exec_inference = Imperative::PreferBulkExecInference();
// Whether to perform bulk exec for training
const profiler::Profiler *prof = profiler::Profiler::Get();
bool prefer_bulk_exec = dmlc::GetEnv("MXNET_EXEC_BULK_EXEC_TRAIN", 1)
&& (!prof || !prof->AggregateEnabled());
bool prefer_bulk_exec_train = Imperative::PreferBulkExecTrain()
&& (!prof || !prof->AggregateEnabled());

bool is_training = num_forward_nodes_ != total_num_nodes;

if (prefer_bulk_exec && is_training) {
this->BulkTrainingOpSegs(total_num_nodes);
if (prefer_bulk_exec_train && is_training) {
// Bulk the forward portion of the graph per the bulk segment max size for forward training
this->BulkOpSegs(0, num_forward_nodes_, Imperative::BulkExecMaxNodeTrainFwd());
// Bulk the backward portion of the graph per the bulk segment max size for backward training
this->BulkOpSegs(num_forward_nodes_, total_num_nodes, Imperative::BulkExecMaxNodeTrainBwd());
}

if (prefer_bulk_exec_inference && !is_training) {
this->BulkInferenceOpSegs();
// Bulk the entire graph as one bulk segment if possible
this->BulkOpSegs(0, total_num_nodes, total_num_nodes);
}
}


void GraphExecutor::BulkTrainingOpSegs(size_t total_num_nodes) {
// The maximum number of node in a segment executed in bulk
size_t num_nodes_threshold = dmlc::GetEnv("MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN", 15);

// create forward segments for training
size_t topo_start = 0;
for (size_t nid = 0; nid < num_forward_nodes_; nid++) {
auto &node = graph_.indexed_graph()[nid].source;
auto &op_node = op_nodes_[nid];
// check if the segment relies on external input, or exceeds maxinum number of node,
// or requires async ops
if (node->is_variable() || nid - topo_start > num_nodes_threshold ||
op_node.exec->exec_type() != ExecType::kSync) {
// create a new segment for the previous nodes if the current one cannot be bulked
cached_seg_opr_[topo_start] = this->CreateCachedSegOpr(topo_start, nid);
topo_start = nid + 1;
}
}
// the last segment
if (topo_start != num_forward_nodes_) {
cached_seg_opr_[topo_start] = this->CreateCachedSegOpr(topo_start, num_forward_nodes_);
}

// create backward segments for training
// get all gradient variables
std::unordered_set<engine::VarHandle> grad_vars;
for (auto &kv : grad_store_) {
grad_vars.insert(kv.second.var());
}
auto &idx = graph_.indexed_graph();
topo_start = num_forward_nodes_;
for (size_t nid = num_forward_nodes_; nid < total_num_nodes; nid++) {
auto &op_node = op_nodes_[nid];
if (op_node.skip_exec_node || op_node.exec == nullptr) {
continue;
}
if (idx[nid].source->is_variable() || nid - topo_start > num_nodes_threshold ||
op_node.exec->exec_type() != ExecType::kSync) {
cached_seg_opr_[topo_start] = this->CreateCachedSegOpr(topo_start, nid);
topo_start = nid + 1;
} else {
// If it produces output gradient, don't include it in the segment
bool output_gradient = false;
for (auto &out_arr : op_node.exec->out_array) {
if (grad_vars.find(out_arr.var()) != grad_vars.end()) {
output_gradient = true;
}
}
if (output_gradient) {
cached_seg_opr_[topo_start] = this->CreateCachedSegOpr(topo_start, nid);
topo_start = nid + 1;
}
}
}
// last segment for backward
if (topo_start < total_num_nodes) {
cached_seg_opr_[topo_start] = this->CreateCachedSegOpr(topo_start, total_num_nodes);
}
}

void GraphExecutor::BulkInferenceOpSegs() {
// Attempt to bulk the whole graph for inference. We will only create new segments when
// required for non-kSync operations.
size_t topo_start = 0;
for (size_t nid = 0; nid < num_forward_nodes_; nid++) {
void GraphExecutor::BulkOpSegs(size_t from_node, size_t up_to_node, size_t segment_num_nodes_max) {
size_t topo_start = from_node;
size_t segment_node_count = 0;
for (size_t nid = from_node; nid < up_to_node; nid++) {
auto &node = graph_.indexed_graph()[nid].source;
auto &op_node = op_nodes_[nid];

// Variables do not need to be segmented at inference time.
if (node->is_variable()) continue;

if (op_node.exec->exec_type() != ExecType::kSync) {
cached_seg_opr_[topo_start] = this->CreateCachedSegOpr(topo_start, nid);
// Variables, such as learned weights, are ignored in the segment_node_count
bool ignore_node = node->is_variable() || op_node.skip_exec_node || op_node.exec == nullptr;
if (!ignore_node)
segment_node_count++;
bool can_bulk = ignore_node || op_node.exec->exec_type() == ExecType::kSync;
// check if we need to create the segment based on properties of this node
if (!can_bulk || nid == up_to_node - 1 || segment_node_count >= segment_num_nodes_max) {
// Create a new segment for the previous nodes- include also this node if it's bulkable
cached_seg_opr_[topo_start] = this->CreateCachedSegOpr(topo_start, can_bulk ? nid + 1 : nid);
topo_start = nid + 1;
segment_node_count = 0;
}
}
// The last segment
if (topo_start != num_forward_nodes_) {
cached_seg_opr_[topo_start] = this->CreateCachedSegOpr(topo_start, num_forward_nodes_);
}
}

void GraphExecutor::ExecuteMonInputCallback(size_t nid) {
Expand Down
6 changes: 2 additions & 4 deletions src/executor/graph_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,8 @@ class GraphExecutor : public Executor {
void ExecuteMonInputCallback(size_t nid);
// run the monitor callback for output of node `nid`
void ExecuteMonOutputCallback(size_t nid);
// peform bulking and segmentation on an inference graph
void BulkInferenceOpSegs();
// perform bulking and segmentation on a training graph
void BulkTrainingOpSegs(size_t total_num_nodes);
// peform bulking and segmentation on the region [from_node, up_to_node) of a graph
void BulkOpSegs(size_t from_node, size_t up_to_node, size_t segment_num_nodes_max);
// indicate whether there is a backward graph for gradients.
bool need_grad_;
// internal graph
Expand Down
11 changes: 10 additions & 1 deletion src/imperative/cached_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,18 @@ void CachedOp::StaticInitExec(
SetupOpExec(g, i, state.execs[i], state.arrays, state.array_reqs);
}

// Init bulk_size for Inference mode with bulking enabled (= entire forward graph).
size_t bulk_size = idx.num_nodes();
if (recording || keep_fwd) {
bulk_size = keep_fwd ? config_.backward_bulk_size : config_.forward_bulk_size;
// Training mode
if (!Imperative::PreferBulkExecTrain())
bulk_size = 0;
else
bulk_size = keep_fwd ? config_.backward_bulk_size : config_.forward_bulk_size;
} else {
// Inference mode
if (!Imperative::PreferBulkExecInference())
bulk_size = 0;
}

CreateEngineOpSeg(idx, default_ctx, start_nid, end_nid, bulk_size,
Expand Down
4 changes: 2 additions & 2 deletions src/imperative/cached_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ struct CachedOpConfig : public dmlc::Parameter<CachedOpConfig> {
.set_default(2)
.describe("Maximum number of operators that can be inlined.");
DMLC_DECLARE_FIELD(forward_bulk_size)
.set_default(dmlc::GetEnv("MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN", 15))
.set_default(Imperative::BulkExecMaxNodeTrainFwd())
.describe("Segment size of bulk execution during forward pass.");
DMLC_DECLARE_FIELD(backward_bulk_size)
.set_default(dmlc::GetEnv("MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN", 15))
.set_default(Imperative::BulkExecMaxNodeTrainBwd())
.describe("Segment size of bulk execution during backward pass.");
DMLC_DECLARE_FIELD(data_indices)
.set_default(nnvm::Tuple<uint32_t>())
Expand Down
78 changes: 78 additions & 0 deletions tests/python/gpu/test_gluon_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
curr_path = os.path.dirname(os.path.abspath(os.path.expanduser(__file__)))
sys.path.insert(0, os.path.join(curr_path, '../unittest'))
from common import setup_module, with_seed, teardown, assert_raises_cudnn_not_satisfied
from common import run_in_spawned_process
from test_gluon import *
from test_loss import *
from test_gluon_rnn import *
Expand Down Expand Up @@ -408,6 +409,83 @@ def tensor_size(big_tensor_bytes):
# Evaluate model
net(data_in).asnumpy()

# isolated execution bulking test function to be invoked with different env var settings
def _test_bulking_in_process(seed, time_per_iteration):
# Use flip since it's a simple function with same-sized I/O unlikely to ever be fused.
class Flip(gluon.HybridBlock):
def __init__(self, **kwargs):
super(Flip, self).__init__(**kwargs)

def hybrid_forward(self, F, x):
return F.flip(x, axis=0)

def get_net(num_ops):
net = nn.HybridSequential()
with net.name_scope():
for _ in range(num_ops):
net.add(Flip())
return net

data_shape = (10,)
num_ops = 1000
num_iterations = 20

# build model
x = mx.ndarray.zeros(data_shape)
x.attach_grad()
dy = mx.ndarray.ones(data_shape)
net = get_net(num_ops)
net.hybridize(static_alloc=True, static_shape=True)

# time a number of forward() and backward() executions after some warm-up iterations
warmups = 1
for i in range(num_iterations+warmups):
with autograd.record():
if i == warmups:
start = time.time()
y = net(x)
y.backward(dy)
x.grad.wait_to_read()

time_per_iteration.value = (time.time() - start) / num_iterations

@with_seed()
def test_bulking():
# test case format: (max_fwd_segment_size, max_bwd_segment_size, enable_bulking_in_training)
test_cases = [(0,0,True), (1,1,True), (15,15,False), (15,0,True), (0,15,True), (15,15,True)]
times = {}
times_str = ''
for seg_sizes in test_cases:
# Create shared variable to return measured time from test process
time_per_iteration = mp.Manager().Value('d', 0.0)
if not run_in_spawned_process(_test_bulking_in_process,
{'MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_FWD' : seg_sizes[0],
'MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_BWD' : seg_sizes[1],
'MXNET_EXEC_BULK_EXEC_TRAIN' : seg_sizes[2]},
time_per_iteration):
# skip test since the python version can't run it properly. Warning msg was logged.
return
times[seg_sizes] = time_per_iteration.value
times_str += \
'\n runtime of (fwd,bwd,enable) op seg setting ({},{},{}) =\t{:.1f} msec'.format(
seg_sizes[0], seg_sizes[1], seg_sizes[2], 1000.0 * times[seg_sizes])

fastest_non_bulked_time = min(times[(0,0,True)], times[(1,1,True)], times[(15,15,False)])
slowest_half_bulked_time = max(times[(0,15,True)], times[(15,0,True)])
fastest_half_bulked_time = min(times[(0,15,True)], times[(15,0,True)])
fully_bulked_time = times[(15,15,True)]

print(times_str)
# Non-bulked times[0,0,True], times[1,1,True] and times[15,15,False] should be about the same,
# slower than both half-bulked times[0,15,True] and times[15,0,True]
assert slowest_half_bulked_time < fastest_non_bulked_time, \
'A half-bulked exec time is slower than the non-bulked time by {} secs! {}' \
.format(slowest_half_bulked_time - fastest_non_bulked_time, times_str)
# The fully bulked times[15,15,True] should be faster than both half-bulked runs
assert fully_bulked_time < fastest_half_bulked_time, \
'The fully-bulked exec time is slower than a half-bulked time by {} secs! {}' \
.format(fully_bulked_time - fastest_half_bulked_time, times_str)


if __name__ == '__main__':
import nose
Expand Down
Loading

0 comments on commit 208e76b

Please sign in to comment.