Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some fixes to make training work #5

Open
wants to merge 13 commits into
base: optim-linear
Choose a base branch
from
10 changes: 7 additions & 3 deletions deepspeed/linear/optimized_linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import torch
import math
import torch.nn as nn

import torch.nn.functional as F
from deepspeed.accelerator import get_accelerator
import deepspeed.comm as dist

Expand Down Expand Up @@ -84,7 +84,7 @@ def __init__(self,
self.zero_shards = self.lora_config.base_weight_sharding
self.sharded_weight_size = int(float(self.input_dim) // self.zero_shards)
w = torch.nn.Parameter(torch.empty((self.output_dim, self.sharded_weight_size), dtype=dtype))

torch.nn.init.xavier_uniform_(w)
if self.quantization_config is not None:
self.base_weight = QuantizedParameter(w)
else:
Expand Down Expand Up @@ -136,6 +136,10 @@ def forward(self, input_tensor):
else:
base_weight = self.base_weight

base_weight_output = self.linear_without_F_linear(input_tensor, base_weight)
# if torch.distributed.get_rank() == 0:
# import pdb; pdb.set_trace()
# torch.distributed.barrier()
base_weight_output = F.linear(input_tensor, base_weight)
lora_output = self.lora_weight_2(self.lora_weight_1(input_tensor))
# torch.distributed.barrier()
return base_weight_output + self.lora_scaling_factor * lora_output
26 changes: 26 additions & 0 deletions deepspeed/moe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,29 @@ def split_params_into_different_moe_groups_for_optimizer(

def is_moe_param_group(param_group):
return param_group.get('moe', False)


def configure_moe_param_groups(model_parameters: List):
# peak at the first element to determine how to proceed
first = model_parameters[0]

# match torch.optim.Optimizer expectations
if not isinstance(first, (torch.Tensor, dict)):
raise TypeError("param argument that would be given to the optimizer should be "
f"an iterable of Tensors or dicts, but got {type(first)}")

# Case 1: model_parameters is a list of torch.nn.Parameter
# -> need to create moe compatible param groups
if isinstance(first, torch.nn.Parameter):
param_group = {'params': model_parameters, 'name': 'dense-params'}
return split_params_into_different_moe_groups_for_optimizer(param_group)

# Case 2: model_parameters is a list of param groups List[dict]
# -> moe compatible param groups might already exist, if not create them
elif isinstance(first, dict):
#there are no moe groups created
if not any(['moe' in param_group for param_group in model_parameters]):
return split_params_into_different_moe_groups_for_optimizer(model_parameters)
else:
# moe groups exist, nothing to do
return model_parameters
4 changes: 3 additions & 1 deletion deepspeed/runtime/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
from ..ops.adam import FusedAdam
from ..moe.sharded_moe import TopKGate, MOELayer
from ..moe.layer import MoE
from ..moe.utils import is_moe_param
from ..moe.utils import is_moe_param, configure_moe_param_groups
from ..git_version_info import version

from deepspeed.profiling.flops_profiler.profiler import FlopsProfiler
Expand Down Expand Up @@ -1227,6 +1227,8 @@ def _do_optimizer_sanity_check(self, basic_optimizer):
# Configure optimizer
def _configure_optimizer(self, client_optimizer, model_parameters):
if client_optimizer is None:
if self.has_moe_layers:
model_parameters = configure_moe_param_groups(model_parameters)
basic_optimizer = self._configure_basic_optimizer(model_parameters)
log_dist(f"Using DeepSpeed Optimizer param name {self.optimizer_name()} as basic optimizer", ranks=[0])
else:
Expand Down
36 changes: 36 additions & 0 deletions tests/unit/moe/test_moe.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,42 @@
from deepspeed.runtime.utils import required_torch_version


@pytest.mark.parametrize("zero_stage", [0, 1, 2])
class TestSimpleMoE(DistributedTest):
world_size = 2

def test(self, zero_stage):
if not required_torch_version(min_version=1.8):
pytest.skip("DeepSpeed MoE tests need torch 1.8 or higher to run correctly")

config_dict = {
"train_micro_batch_size_per_gpu": 1,
"steps_per_print": 1,
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.00015
}
},
"fp16": {
"enabled": True
},
"zero_optimization": {
"stage": zero_stage
}
}
# should automatically create moe param groups in deepspeed backend
hidden_dim = 16
model = SimpleMoEModel(hidden_dim=hidden_dim, ep_size=1)
model, optimizer, _, _ = deepspeed.initialize(config=config_dict, model=model)
data_loader = sequence_dataloader(model=model, total_samples=50, hidden_dim=hidden_dim, device=model.device)

for n, batch in enumerate(data_loader):
loss = model(batch[0], batch[1])
model.backward(loss)
model.step()


@pytest.mark.parametrize("ep_size", [2, 4])
@pytest.mark.parametrize("zero_stage", [0, 1, 2])
@pytest.mark.parametrize("use_residual", [True, False])
Expand Down