Source code for mmengine._strategy.deepspeed
# Copyright (c) OpenMMLab. All rights reserved.
import json
import os.path as osp
import time
from typing import Callable, Dict, List, Optional, Union
try:
import deepspeed
except ImportError:
deepspeed = None
import torch.nn as nn
import mmengine
from mmengine.dist import init_dist
from mmengine.model.wrappers._deepspeed import MMDeepSpeedEngineWrapper
from mmengine.optim import BaseOptimWrapper, _ParamScheduler
from mmengine.registry import STRATEGIES
from mmengine.utils import get_git_hash
from .base import BaseStrategy
[docs]@STRATEGIES.register_module()
class DeepSpeedStrategy(BaseStrategy):
"""Support training models with DeepSpeed.
Note:
The detailed usage of parameters can be found at
https://www.deepspeed.ai/docs/config-json/.
Args:
config (str or dict, optional): If it is a string, it is a path to load
config for deepspeed. Defaults to None.
zero_optimization (dict, optional): Enabling and configuring ZeRO
memory optimizations. Defaults to None.
gradient_clipping (float): Enable gradient clipping with value.
Defaults to 1.0.
fp16 (dict, optional): Configuration for using mixed precision/FP16
training that leverages NVIDIA's Apex package.
inputs_to_half (list[int or str], optional): Which inputs are to
converted to half precision. Defaults to None.
If ``fp16`` is enabled, it also should be set.
bf16 (dict, optional): Configuration for using bfloat16 floating-point
format as an alternative to FP16. Defaults to None.
amp (dict, optional): Configuration for using automatic mixed
precision (AMP) training that leverages NVIDIA's Apex AMP package.
Defaults to None.
activation_checkpointing (dict, optional): Reduce memory usage by
clearing activations of certain layers and recomputing them
during a backward pass.
Defaults to None.
aio (dict, optional): Configuring the asynchronous I/O module for
offloading parameter and optimizer states to persistent (NVMe)
storage. This module uses Linux native asynchronous I/O (libaio).
Defaults to None.
"""
def __init__(
self,
*,
# the following args are for deepspeed
config: Union[str, dict, None] = None,
zero_optimization: Optional[dict] = None,
gradient_clipping: float = 1.0,
fp16: Optional[dict] = None,
inputs_to_half: Optional[List[Union[int, str]]] = None,
bf16: Optional[dict] = None,
amp: Optional[dict] = None,
activation_checkpointing: Optional[dict] = None,
aio: Optional[dict] = None,
train_micro_batch_size_per_gpu: Optional[int] = None,
gradient_accumulation_steps: int = 1,
# disable the log printed by deepseed
steps_per_print: int = 10000000000000,
# the following args are for BaseStrategy
**kwargs,
):
assert deepspeed is not None, \
'DeepSpeed is not installed. Please check ' \
'https://github.com/microsoft/DeepSpeed#installation.'
super().__init__(**kwargs)
self.config = self._parse_config(config)
if zero_optimization is not None:
self.config['zero_optimization'] = zero_optimization
self.config['gradient_clipping'] = gradient_clipping
if fp16 is not None:
self.config['fp16'] = fp16
if bf16 is not None:
self.config['bf16'] = bf16
if amp is not None:
self.config['amp'] = amp
if activation_checkpointing is not None:
self.config['activation_checkpointing'] = activation_checkpointing
if aio is not None:
self.config['aio'] = aio
if ('train_micro_batch_size_per_gpu' not in self.config
and 'train_batch_size' not in self.config):
assert train_micro_batch_size_per_gpu is not None, (
'`train_micro_batch_size_per_gpu` or `train_batch_size` '
'should be set!')
self.config['train_micro_batch_size_per_gpu'] = \
train_micro_batch_size_per_gpu
if train_micro_batch_size_per_gpu is not None:
self.config['train_micro_batch_size_per_gpu'] = \
train_micro_batch_size_per_gpu
self.config['gradient_accumulation_steps'] = \
gradient_accumulation_steps
self.config['steps_per_print'] = steps_per_print
self._inputs_to_half = inputs_to_half
def _parse_config(self, config):
if config is None:
config = dict()
elif isinstance(config, str):
with open(config) as f:
config = json.load(f)
return config
def _setup_distributed( # type: ignore
self,
launcher: Optional[str] = None,
backend: str = 'nccl',
**kwargs,
):
"""Setup distributed environment.
Args:
launcher (str, optional): Way to launch multi processes.
DeepSpeedStrategy does not support the launcher argument.
backend (str): Communication Backends. Supported backends are
'nccl', 'gloo' and 'mpi'. Defaults to 'nccl'.
**kwargs: Other arguments for :func:`deepspeed.init_distributed`.
"""
init_dist(launcher, backend, init_backend='deepspeed', **kwargs)
[docs] def prepare(
self,
model: Union[nn.Module, dict],
*,
optim_wrapper: Union[BaseOptimWrapper, dict, None] = None,
param_scheduler: Union[_ParamScheduler, Dict, List, None] = None,
compile: Union[dict, bool] = False,
dispatch_kwargs: Optional[dict] = None,
):
"""Prepare model and some components.
Args:
model (:obj:`torch.nn.Module` or dict): The model to be run. It
can be a dict used for build a model.
Keyword Args:
optim_wrapper (BaseOptimWrapper or dict, optional): Computing the
gradient of model parameters and updating them.
Defaults to None.
See :meth:`build_optim_wrapper` for examples.
param_scheduler (_ParamScheduler or dict or list, optional):
Parameter scheduler for updating optimizer parameters. If
specified, :attr:`optim_wrapper` should also be specified.
Defaults to None.
See :meth:`build_param_scheduler` for examples.
compile (dict, optional): Config to compile model.
Defaults to False. Requires PyTorch>=2.0.
dispatch_kwargs (dict, optional): Kwargs to be passed to other
methods of Strategy. Defaults to None.
"""
if self._prepared:
return self._prepared_components()
assert dispatch_kwargs is not None
self.dispatch_kwargs.update(dispatch_kwargs)
model = self.build_model(model)
model = self._init_model_weights(model)
if optim_wrapper is not None:
self.optim_wrapper = self.build_optim_wrapper(optim_wrapper, model)
self.model = self._wrap_model(model)
self.optim_wrapper.model = self.model # type: ignore
else:
self.model = self._wrap_model(model)
if param_scheduler is not None:
self.param_schedulers = self.build_param_scheduler(
param_scheduler, self.optim_wrapper)
self._prepared = True
return self._prepared_components()
def _wrap_model(self, model: nn.Module) -> nn.Module:
if hasattr(self, 'optim_wrapper'):
engine, self.optim_wrapper.optimizer, *_ = deepspeed.initialize(
model=model,
optimizer=self.optim_wrapper.optimizer,
config=self.config)
else:
engine, *_ = deepspeed.initialize(model=model, config=self.config)
wrapper = MMDeepSpeedEngineWrapper(
model=engine, inputs_to_half=self._inputs_to_half)
return wrapper
[docs] def load_checkpoint(
self,
filename: str,
*,
map_location: Union[str, Callable] = 'cpu',
strict: bool = False,
revise_keys: list = [(r'^module.', '')],
callback: Optional[Callable] = None,
) -> dict:
"""Load checkpoint from given ``filename``.
Warning:
`map_localtion` and `callback` parameters are not supported yet.
Args:
filename (str): Accept local filepath, URL, ``torchvision://xxx``,
``open-mmlab://xxx``.
"""
self.logger.info(f'Load checkpoint from {filename}')
dirname, basename = osp.split(filename)
_, extra_ckpt = self.model.load_checkpoint(
dirname, tag=basename, load_optimizer_states=False)
return extra_ckpt
[docs] def resume(
self,
filename: str,
*,
resume_optimizer: bool = True,
resume_param_scheduler: bool = True,
map_location: Union[str, Callable] = 'default',
callback: Optional[Callable] = None,
) -> dict:
"""Resume training from given ``filename``.
Warning:
`map_location` and `callback` parameters are not supported yet.
Args:
filename (str): Accept local filepath.
Keyword Args:
resume_optimizer (bool): Whether to resume optimizer state.
Defaults to True.
resume_param_scheduler (bool): Whether to resume param scheduler
state. Defaults to True.
"""
self.logger.info(f'Resume checkpoint from {filename}')
dirname, basename = osp.split(filename)
_, extra_ckpt = self.model.load_checkpoint(
dirname, tag=basename, load_optimizer_states=resume_optimizer)
if resume_optimizer:
self.load_optim_state_dict(extra_ckpt.pop('optim_wrapper'))
if resume_param_scheduler and hasattr(self, 'param_schedulers'):
param_schedulers = extra_ckpt.pop('param_schedulers')
self.load_scheduler_state_dict(param_schedulers)
# resume random seed
resumed_seed = extra_ckpt['meta'].get('seed', None)
current_seed = self._randomness.get('seed')
if resumed_seed is not None and resumed_seed != current_seed:
if current_seed is not None:
self.logger.warning(f'The value of random seed in the '
f'checkpoint "{resumed_seed}" is '
f'different from the value in '
f'`randomness` config "{current_seed}"')
self._randomness.update(seed=resumed_seed)
self._set_randomness(**self._randomness)
return extra_ckpt
[docs] def save_checkpoint(
self,
filename: str,
*,
save_optimizer: bool = True,
save_param_scheduler: bool = True,
extra_ckpt: Optional[dict] = None,
callback: Optional[Callable] = None,
) -> None:
"""Save checkpoint to given ``filename``.
Warning:
`save_optimizer` and `callback` parameters are not supported yet.
Args:
filename (str): Filename to save checkpoint.
Keyword Args:
save_param_scheduler (bool): Whether to save the param_scheduler
to the checkpoint. Defaults to True.
extra_ckpt (dict, optional): Extra checkpoint to save.
Defaults to None.
"""
if extra_ckpt is None:
extra_ckpt = dict()
if 'meta' not in extra_ckpt:
extra_ckpt['meta'] = dict()
extra_ckpt['meta'].update(
seed=self.seed,
time=time.strftime('%Y%m%d_%H%M%S', time.localtime()),
mmengine=mmengine.__version__ + get_git_hash(),
)
if save_optimizer and hasattr(self, 'optim_wrapper'):
# The key can not be 'optimizer', otherwise error will be thrown
# when loading or resuming checkpoint.
extra_ckpt['optim_wrapper'] = self.optim_state_dict()
if save_param_scheduler and hasattr(self, 'param_schedulers'):
extra_ckpt['param_schedulers'] = self.scheduler_state_dict()
dirname, basename = osp.split(filename)
self.model.save_checkpoint(
dirname, tag=basename, client_state=extra_ckpt, save_latest=False)