diff --git a/docs/source-pytorch/accelerators/gpu_faq.rst b/docs/source-pytorch/accelerators/gpu_faq.rst index 25f108ce72f8e..ba95554b292b0 100644 --- a/docs/source-pytorch/accelerators/gpu_faq.rst +++ b/docs/source-pytorch/accelerators/gpu_faq.rst @@ -38,18 +38,6 @@ In DDP, DDP_SPAWN, Deepspeed, DDP_SHARDED your effective batch size will be 7 * .. note:: Huge batch sizes are actually really bad for convergence. Check out: `Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour `_ -In DP, which does not support multi-node, the effective batch size will be just 7, regardless of how many devices are being used. -The reason is that the full batch gets split evenly between all devices. - -.. code-block:: python - - # effective batch size = 7, each GPU sees a batch size of 1 except the last GPU - Trainer(accelerator="gpu", devices=8, strategy="dp") - - # effective batch size = 7, first GPU sees a batch size of 4, the other sees batch size 3 - Trainer(accelerator="gpu", devices=2, num_nodes=10, strategy="dp") - - ---- @@ -57,11 +45,11 @@ The reason is that the full batch gets split evenly between all devices. How do I use multiple GPUs on Jupyter or Colab notebooks? ********************************************************* -To use multiple GPUs on notebooks, use the *DDP_SPAWN*, *DDP_NOTEBOOK*, or *DP* mode. +To use multiple GPUs on notebooks, use the *DDP_SPAWN* or *DDP_NOTEBOOK* mode. .. code-block:: python - Trainer(accelerator="gpu", devices=4, strategy="ddp_notebook" | "ddp_spawn" | "dp") + Trainer(accelerator="gpu", devices=4, strategy="ddp_notebook" | "ddp_spawn") If you want to use other models, please launch your training via the command-shell. diff --git a/docs/source-pytorch/accelerators/gpu_intermediate.rst b/docs/source-pytorch/accelerators/gpu_intermediate.rst index 6deca436530a1..9af6b5b1e7ff8 100644 --- a/docs/source-pytorch/accelerators/gpu_intermediate.rst +++ b/docs/source-pytorch/accelerators/gpu_intermediate.rst @@ -20,7 +20,6 @@ Lightning supports multiple ways of doing distributed training. | -- Data Parallel (``strategy='dp'``) (multiple-gpus, 1 machine) - DistributedDataParallel (multiple-gpus across many machines) - Regular (``strategy='ddp'``) - Spawn (``strategy='ddp_spawn'``) @@ -33,28 +32,6 @@ For a deeper understanding of what Lightning is doing, feel free to read this `guide `_. -Data Parallel -^^^^^^^^^^^^^ -:class:`~torch.nn.DataParallel` (DP) splits a batch across k GPUs. -That is, if you have a batch of 32 and use DP with 2 GPUs, each GPU will process 16 samples, -after which the root node will aggregate the results. - -.. warning:: DP use is discouraged by PyTorch and Lightning. State is not maintained on the replicas created by the - :class:`~torch.nn.DataParallel` wrapper and you may see errors or misbehavior if you assign state to the module - in the ``forward()`` or ``*_step()`` methods. For the same reason we cannot fully support - :doc:`Manual Optimization <../model/manual_optimization>` with DP. Use DDP which is more stable and at least 3x faster. - -.. warning:: DP only supports scattering and gathering primitive collections of tensors like lists, dicts, etc. - Therefore :meth:`~pytorch_lightning.core.hooks.ModelHooks.transfer_batch_to_device` and - :meth:`~pytorch_lightning.core.hooks.ModelHooks.on_after_batch_transfer` - do not apply in this mode and if you have overridden any of them, an exception will be raised. - -.. testcode:: - :skipif: torch.cuda.device_count() < 2 - - # train on 2 GPUs (using DP mode) - trainer = Trainer(accelerator="gpu", devices=2, strategy="dp") - Distributed Data Parallel ^^^^^^^^^^^^^^^^^^^^^^^^^ :class:`~torch.nn.parallel.DistributedDataParallel` (DDP) works as follows: @@ -189,7 +166,6 @@ The Trainer enables it by default when such environments are detected. # can also be used in non-interactive environments trainer = Trainer(accelerator="gpu", devices=8, strategy="ddp_fork") -Data Parallel (``strategy="dp"``) is the only other strategy supported in interactive environments but is slower, is discouraged by PyTorch and has other limitations. Among the native distributed strategies, regular DDP (``strategy="ddp"``) is still recommended as the go-to strategy over Spawn and Fork/Notebook for its speed and stability but it can only be used with scripts. @@ -234,107 +210,24 @@ Comparison of DDP variants and tradeoffs - Fast -DP caveats -^^^^^^^^^^ -In DP each GPU within a machine sees a portion of a batch. -It does roughly the following: - -.. testcode:: - - def distributed_forward(batch, model): - batch = torch.Tensor(32, 8) - gpu_0_batch = batch[:8] - gpu_1_batch = batch[8:16] - gpu_2_batch = batch[16:24] - gpu_3_batch = batch[24:] - - y_0 = model_copy_gpu_0(gpu_0_batch) - y_1 = model_copy_gpu_1(gpu_1_batch) - y_2 = model_copy_gpu_2(gpu_2_batch) - y_3 = model_copy_gpu_3(gpu_3_batch) - - return [y_0, y_1, y_2, y_3] - -So, when Lightning calls any of the `training_step`, `validation_step`, `test_step` -you will only be operating on one of those pieces. - -.. testcode:: - - # the batch here is a portion of the FULL batch - def training_step(self, batch, batch_idx): - y_0 = batch - -For most metrics, this doesn't really matter. However, if you want to add something to your computational graph using -all batch parts you can use the `training_step_end` step. - -.. testcode:: - - def training_step_end(self, outputs): - # only use when on dp - outputs = torch.cat(outputs, dim=1) - softmax = softmax(outputs, dim=1) - out = softmax.mean() - return out - -In pseudocode, the full sequence is: - -.. code-block:: python - - # get data - batch = next(dataloader) - - # copy model and data to each gpu - batch_splits = split_batch(batch, num_gpus) - models = copy_model_to_gpus(model) - - # in parallel, operate on each batch chunk - all_results = [] - for gpu_num in gpus: - batch_split = batch_splits[gpu_num] - gpu_model = models[gpu_num] - out = gpu_model(batch_split) - all_results.append(out) - - # use the full batch for something like softmax - full_out = model.training_step_end(all_results) - -If `training_step_end` is defined it will be called regardless of TPU, DP, DDP, etc... which means -it will behave the same regardless of the backend. - -Validation and test step have the same option when using DP. - -.. testcode:: - - def validation_step_end(self, step_output): - ... - - - def test_step_end(self, step_output): - ... - - Distributed and 16-bit precision ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Below are the possible configurations we support. -+-------+---------+-----+-----+--------+-----------------------------------------------------------------------+ -| 1 GPU | 1+ GPUs | DDP | DP | 16-bit | command | -+=======+=========+=====+=====+========+=======================================================================+ -| Y | | | | | `Trainer(accelerator="gpu", devices=1)` | -+-------+---------+-----+-----+--------+-----------------------------------------------------------------------+ -| Y | | | | Y | `Trainer(accelerator="gpu", devices=1, precision=16)` | -+-------+---------+-----+-----+--------+-----------------------------------------------------------------------+ -| | Y | Y | | | `Trainer(accelerator="gpu", devices=k, strategy='ddp')` | -+-------+---------+-----+-----+--------+-----------------------------------------------------------------------+ -| | Y | Y | | Y | `Trainer(accelerator="gpu", devices=k, strategy='ddp', precision=16)` | -+-------+---------+-----+-----+--------+-----------------------------------------------------------------------+ -| | Y | | Y | | `Trainer(accelerator="gpu", devices=k, strategy='dp')` | -+-------+---------+-----+-----+--------+-----------------------------------------------------------------------+ -| | Y | | Y | Y | `Trainer(accelerator="gpu", devices=k, strategy='dp', precision=16)` | -+-------+---------+-----+-----+--------+-----------------------------------------------------------------------+ - -DDP and DP can also be used with 1 GPU, but there's no reason to do so other than debugging distributed-related issues. ++-------+---------+-----+--------+-----------------------------------------------------------------------+ +| 1 GPU | 1+ GPUs | DDP | 16-bit | command | ++=======+=========+=====+========+=======================================================================+ +| Y | | | | `Trainer(accelerator="gpu", devices=1)` | ++-------+---------+-----+--------+-----------------------------------------------------------------------+ +| Y | | | Y | `Trainer(accelerator="gpu", devices=1, precision=16)` | ++-------+---------+-----+--------+-----------------------------------------------------------------------+ +| | Y | Y | | `Trainer(accelerator="gpu", devices=k, strategy='ddp')` | ++-------+---------+-----+--------+-----------------------------------------------------------------------+ +| | Y | Y | Y | `Trainer(accelerator="gpu", devices=k, strategy='ddp', precision=16)` | ++-------+---------+-----+--------+-----------------------------------------------------------------------+ + +DDP can also be used with 1 GPU, but there's no reason to do so other than debugging distributed-related issues. Implement Your Own Distributed (DDP) training diff --git a/docs/source-pytorch/api_references.rst b/docs/source-pytorch/api_references.rst index cd9007e5819ee..12009f2f705a7 100644 --- a/docs/source-pytorch/api_references.rst +++ b/docs/source-pytorch/api_references.rst @@ -216,7 +216,6 @@ strategies ColossalAIStrategy DDPSpawnStrategy DDPStrategy - DataParallelStrategy DeepSpeedStrategy FSDPStrategy HPUParallelStrategy diff --git a/docs/source-pytorch/common/lightning_module.rst b/docs/source-pytorch/common/lightning_module.rst index 51cd7ade01750..82b70c4c84a83 100644 --- a/docs/source-pytorch/common/lightning_module.rst +++ b/docs/source-pytorch/common/lightning_module.rst @@ -261,52 +261,6 @@ override the :meth:`~pytorch_lightning.LightningModule.on_training_epoch_end` me ... self.training_step_outputs.clear() # free memory -Training with DataParallel -========================== - -When training using a ``strategy`` that splits data from each batch across GPUs, sometimes you might -need to aggregate them on the main GPU for processing (DP). - -In this case, implement the :meth:`~pytorch_lightning.core.module.LightningModule.training_step_end` -method which will have outputs from all the devices and you can accumulate to get the effective results. - -.. code-block:: python - - def training_step(self, batch, batch_idx): - x, y = batch - y_hat = self.model(x) - loss = F.cross_entropy(y_hat, y) - pred = ... - return {"loss": loss, "pred": pred} - - - def training_step_end(self, batch_parts): - # predictions from each GPU - predictions = batch_parts["pred"] - # losses from each GPU - losses = batch_parts["loss"] - - gpu_0_prediction = predictions[0] - gpu_1_prediction = predictions[1] - - # do something with both outputs - return (losses[0] + losses[1]) / 2 - - -Here is the Lightning training pseudo-code for DP: - -.. code-block:: python - - for batch_idx, train_batch in enumerate(train_dataloader): - batches = split_batch(train_batch) - dp_outs = [] - for sub_batch in batches: - # 1 - dp_out = training_step(sub_batch, batch_idx) - dp_outs.append(dp_out) - - # 2 - training_step_end(dp_outs) ------------------ @@ -399,54 +353,6 @@ Note that this method is called before :meth:`~pytorch_lightning.LightningModule ... self.validation_step_outputs.clear() # free memory - -Validating with DataParallel -============================ - -When validating using a ``strategy`` that splits data from each batch across GPUs, sometimes you might -need to aggregate them on the main GPU for processing (DP). - -In this case, implement the :meth:`~pytorch_lightning.core.module.LightningModule.validation_step_end` -method which will have outputs from all the devices and you can accumulate to get the effective results. - -.. code-block:: python - - def validation_step(self, batch, batch_idx): - x, y = batch - y_hat = self.model(x) - loss = F.cross_entropy(y_hat, y) - pred = ... - return {"loss": loss, "pred": pred} - - - def validation_step_end(self, batch_parts): - # predictions from each GPU - predictions = batch_parts["pred"] - # losses from each GPU - losses = batch_parts["loss"] - - gpu_0_prediction = predictions[0] - gpu_1_prediction = predictions[1] - - # do something with both outputs - return (losses[0] + losses[1]) / 2 - - -Here is the Lightning validation pseudo-code for DP: - -.. code-block:: python - - for batch in dataloader: - batches = split_batch(batch) - dp_outs = [] - for sub_batch in batches: - # 1 - dp_out = validation_step(sub_batch) - dp_outs.append(dp_out) - - # 2 - validation_step_end(dp_outs) - ---------------- ******* diff --git a/docs/source-pytorch/extensions/strategy.rst b/docs/source-pytorch/extensions/strategy.rst index c92de855c440a..429131ef03944 100644 --- a/docs/source-pytorch/extensions/strategy.rst +++ b/docs/source-pytorch/extensions/strategy.rst @@ -81,9 +81,6 @@ The below table lists all relevant strategies available in Lightning with their * - ddp - :class:`~pytorch_lightning.strategies.DDPStrategy` - Strategy for multi-process single-device training on one or multiple nodes. :ref:`Learn more. ` - * - dp - - :class:`~pytorch_lightning.strategies.DataParallelStrategy` - - Implements data-parallel training in a single process, i.e., the model gets replicated to each device and each gets a split of the data. :ref:`Learn more. ` * - deepspeed - :class:`~pytorch_lightning.strategies.DeepSpeedStrategy` - Provides capabilities to run training using the DeepSpeed library, with training optimizations for large billion parameter models. :ref:`Learn more. ` diff --git a/docs/source-pytorch/guides/speed.rst b/docs/source-pytorch/guides/speed.rst index 812a19fe0cffc..b5120f394eabf 100644 --- a/docs/source-pytorch/guides/speed.rst +++ b/docs/source-pytorch/guides/speed.rst @@ -49,22 +49,9 @@ GPU Training Speedup Tips When training on single or multiple GPU machines, Lightning offers a host of advanced optimizations to improve throughput, memory efficiency, and model scaling. Refer to :doc:`Advanced GPU Optimized Training for more details <../advanced/model_parallel>`. -Prefer DDP Over DP -^^^^^^^^^^^^^^^^^^ -:class:`~pytorch_lightning.strategies.dp.DataParallelStrategy` performs three GPU transfers for EVERY batch: - -1. Copy the model to the device. -2. Copy the data to the device. -3. Copy the outputs of each device back to the main device. - -.. image:: https://pl-public-data.s3.amazonaws.com/docs/static/images/distributed_training/dp.gif - :alt: Animation showing DP execution. - :width: 500 - :align: center - | -Whereas :class:`~pytorch_lightning.strategies.ddp.DDPStrategy` only performs two transfer operations, making DDP much faster than DP: +:class:`~pytorch_lightning.strategies.ddp.DDPStrategy` only performs two transfer operations for each step, making it the simplest distributed training strategy: 1. Moving data to the device. 2. Transfer and sync gradients. diff --git a/src/lightning/pytorch/CHANGELOG.md b/src/lightning/pytorch/CHANGELOG.md index 0ecd0b8beedf2..0000a45555653 100644 --- a/src/lightning/pytorch/CHANGELOG.md +++ b/src/lightning/pytorch/CHANGELOG.md @@ -288,6 +288,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Removed support for passing a scheduling dictionary to `Trainer(accumulate_grad_batches=...)` ([#16729](https://github.com/Lightning-AI/lightning/pull/16729)) +- Removed support for `DataParallel` (`strategy='dp'`) and the `LightningParallelModule`-Wrapper, ([#16748](https://github.com/Lightning-AI/lightning/pull/16748)) + + - Removed the unused `lightning.pytorch.utilities.supporters.{SharedCycleIteratorState,CombinedLoaderIterator}` classes ([#16714](https://github.com/Lightning-AI/lightning/pull/16714)) diff --git a/src/lightning/pytorch/overrides/data_parallel.py b/src/lightning/pytorch/overrides/data_parallel.py deleted file mode 100644 index b13a2055eae7c..0000000000000 --- a/src/lightning/pytorch/overrides/data_parallel.py +++ /dev/null @@ -1,122 +0,0 @@ -# Copyright The Lightning AI team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import numbers -import warnings -from typing import Any, Union - -import torch -from lightning_utilities.core.apply_func import apply_to_collection -from torch import Tensor - -import lightning.pytorch as pl -from lightning.pytorch.overrides.base import _LightningModuleWrapperBase, _LightningPrecisionModuleWrapperBase -from lightning.pytorch.utilities.rank_zero import rank_zero_warn - - -def _ignore_scalar_return_in_dp() -> None: - # Users get confused by this warning so we silence it - warnings.filterwarnings( - "ignore", - message=( - "Was asked to gather along dimension 0, but all input tensors were scalars;" - " will instead unsqueeze and return a vector." - ), - ) - - -class LightningParallelModule(_LightningModuleWrapperBase): - """Wraps the user's LightningModule and redirects the forward call to the appropriate method, either - ``training_step``, ``validation_step``, ``test_step``, or ``predict_step``. - - This class is used in combination with :class:`~torch.nn.parallel.DataParallel` as shown in the example. - It also takes care of converting Python scalars to Tensors and un-squeezes 0-dimensional Tensors as it is required - by :class:`~torch.nn.parallel.DataParallel`. - - Example: - - dp_model = torch.nn.DataParallel( - module=LightningParallelModule(lightning_module), - device_ids=[3, 4], - ... - ) - - Args: - forward_module: The module to wrap. If it's not a ``LightningModule``, it must have an attribute ``.module`` - pointing to a ``LightningModule`` reference. - """ - - def __init__( - self, - forward_module: Union["pl.LightningModule", _LightningPrecisionModuleWrapperBase], - ) -> None: - super().__init__(forward_module=forward_module) - _ignore_scalar_return_in_dp() - - def forward(self, *inputs: Any, **kwargs: Any) -> Any: - self.update_replica_device_attributes(inputs) - # forward call will redirect to training_step, validation_step, etc. - output = super().forward(*inputs, **kwargs) - - def output_transform(data: Any) -> Any: - device = self.lightning_module.device - data = python_scalar_to_tensor(data, device) - data = unsqueeze_scalar_tensor(data) - return data - - output = apply_to_collection(output, dtype=(numbers.Number, Tensor), function=output_transform) - return output - - def update_replica_device_attributes(self, inputs: Any) -> None: - """Updates the device information of LightningModule by reading the device from the inputs. In - :class:`~torch.nn.data_parallel.DataParallel` changes to the state during the `forward` pass are lost when - the replicas get discarded. The only way to know the current device is from the inputs passed into the - model. - - Args: - inputs: A collection of inputs (typically a tuple). If the inputs don't contain tensors, - a warning is shown that accessing ``self.device`` will not return the correct device. - """ - replica_device = None - - def find_tensor_with_device(tensor: Tensor) -> Tensor: - nonlocal replica_device - if replica_device is None and tensor.device != torch.device("cpu"): - replica_device = tensor.device - return tensor - - apply_to_collection(inputs, dtype=Tensor, function=find_tensor_with_device) - - if replica_device is not None: - # by calling .to() we force the update to the self.device property - self._forward_module.to(device=replica_device) - else: - rank_zero_warn( - "Could not determine on which device the inputs are." - " When using DataParallel (strategy='dp'), be aware that in case you are using self.device" - " in your code, it will reference only the root device." - ) - - -def python_scalar_to_tensor(data: Any, device: Union[str, torch.device] = torch.device("cpu")) -> Any: - """Converts a Python scalar number to a torch tensor and places it on the given device.""" - if isinstance(data, numbers.Number): - data = torch.tensor([data], device=device) - return data - - -def unsqueeze_scalar_tensor(data: Any) -> Any: - """Un-squeezes a 0-dim tensor.""" - if isinstance(data, Tensor) and data.dim() == 0: - data = data.unsqueeze(0) - return data diff --git a/src/lightning/pytorch/strategies/__init__.py b/src/lightning/pytorch/strategies/__init__.py index f4c21aa076a81..89961aaa557fd 100644 --- a/src/lightning/pytorch/strategies/__init__.py +++ b/src/lightning/pytorch/strategies/__init__.py @@ -16,7 +16,6 @@ from lightning.pytorch.strategies.ddp import DDPStrategy # noqa: F401 from lightning.pytorch.strategies.ddp_spawn import DDPSpawnStrategy # noqa: F401 from lightning.pytorch.strategies.deepspeed import DeepSpeedStrategy # noqa: F401 -from lightning.pytorch.strategies.dp import DataParallelStrategy # noqa: F401 from lightning.pytorch.strategies.fsdp import FSDPStrategy # noqa: F401 from lightning.pytorch.strategies.hpu_parallel import HPUParallelStrategy # noqa: F401 from lightning.pytorch.strategies.ipu import IPUStrategy # noqa: F401 diff --git a/src/lightning/pytorch/strategies/dp.py b/src/lightning/pytorch/strategies/dp.py deleted file mode 100644 index 058bb07e67fb5..0000000000000 --- a/src/lightning/pytorch/strategies/dp.py +++ /dev/null @@ -1,169 +0,0 @@ -# Copyright The Lightning AI team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import Any, Dict, List, Optional, Union - -import torch -from lightning_utilities.core.apply_func import apply_to_collection -from torch import Tensor -from torch.nn import DataParallel, Module - -import lightning.pytorch as pl -from lightning.fabric.plugins import CheckpointIO -from lightning.fabric.utilities.distributed import ReduceOp -from lightning.pytorch.overrides.base import _LightningPrecisionModuleWrapperBase -from lightning.pytorch.overrides.data_parallel import LightningParallelModule -from lightning.pytorch.plugins.precision import PrecisionPlugin -from lightning.pytorch.strategies.parallel import ParallelStrategy -from lightning.pytorch.strategies.strategy import TBroadcast, TReduce -from lightning.pytorch.utilities.model_helpers import is_overridden -from lightning.pytorch.utilities.types import STEP_OUTPUT - - -class DataParallelStrategy(ParallelStrategy): - """Implements data-parallel training in a single process, i.e., the model gets replicated to each device and - each gets a split of the data.""" - - strategy_name = "dp" - - def __init__( - self, - accelerator: Optional["pl.accelerators.Accelerator"] = None, - parallel_devices: Optional[List[torch.device]] = None, - checkpoint_io: Optional[CheckpointIO] = None, - precision_plugin: Optional[PrecisionPlugin] = None, - ): - super().__init__( - accelerator=accelerator, - parallel_devices=parallel_devices, - cluster_environment=None, - checkpoint_io=checkpoint_io, - precision_plugin=precision_plugin, - ) - - @property - def global_rank(self) -> int: - return 0 - - @property - def local_rank(self) -> int: - return 0 - - @property - def node_rank(self) -> int: - return 0 - - @property - def world_size(self) -> int: - return 1 - - def setup(self, trainer: "pl.Trainer") -> None: - # model needs to be moved to the device before it is wrapped - self.model_to_device() - assert isinstance(self.model, (pl.LightningModule, _LightningPrecisionModuleWrapperBase)) - self.model = self._setup_model(LightningParallelModule(self.model)) - super().setup(trainer) - - def batch_to_device(self, batch: Any, device: Optional[torch.device] = None, dataloader_idx: int = 0) -> Any: - """Moves the batch to the correct device. - - The input and the output is the same type. - - Args: - batch: The batch of samples to move to the correct device - device: The target device - dataloader_idx: The index of the dataloader to which the batch belongs. - """ - # DataParallel handles the transfer of batch to the device - return batch - - def _setup_model(self, model: Module) -> DataParallel: - """Wraps the given model into a :class:`~torch.nn.parallel.DataParallel` module.""" - return DataParallel(module=model, device_ids=self.parallel_devices) - - def reduce( - self, collection: TReduce, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = "mean" - ) -> TReduce: - """Reduces a collection of tensors from all processes. It can be applied to just a single tensor. - - Args: - collection: The collection of tensors to sync and reduce. - group: ignored for DP - reduce_op: ignored for DP - Return: - Reduced tensor values or the same value if it was not or did not contain a tensor. - """ - - def mean(t: Tensor) -> Tensor: - original_dtype = t.dtype - return t.float().mean().to(original_dtype) - - return apply_to_collection(collection, Tensor, mean) - - @property - def root_device(self) -> torch.device: - assert self.parallel_devices is not None - return self.parallel_devices[0] - - def model_to_device(self) -> None: - assert self.model is not None - self.model.to(self.root_device) - - def barrier(self, *args: Any, **kwargs: Any) -> None: - pass - - def broadcast(self, obj: TBroadcast, src: int = 0) -> TBroadcast: - return obj - - def reduce_boolean_decision(self, decision: bool, all: bool = True) -> bool: - return decision - - def training_step(self, *args: Any, **kwargs: Any) -> STEP_OUTPUT: - with self.precision_plugin.train_step_context(): - assert self.model is not None - return self.model(*args, **kwargs) - - def validation_step(self, *args: Any, **kwargs: Any) -> Optional[STEP_OUTPUT]: - with self.precision_plugin.val_step_context(): - assert self.model is not None - return self.model(*args, **kwargs) - - def test_step(self, *args: Any, **kwargs: Any) -> Optional[STEP_OUTPUT]: - with self.precision_plugin.test_step_context(): - assert self.model is not None - return self.model(*args, **kwargs) - - def predict_step(self, *args: Any, **kwargs: Any) -> STEP_OUTPUT: - with self.precision_plugin.predict_step_context(): - assert self.model is not None - return self.model(*args, **kwargs) - - def training_step_end(self, output: STEP_OUTPUT) -> STEP_OUTPUT: - if is_overridden("training_step_end", self.lightning_module): - return output - - if isinstance(output, dict) and "loss" in output: - output["loss"] = self.reduce(output["loss"]) - - elif isinstance(output, Tensor): - output = self.reduce(output) - - return output - - @classmethod - def register_strategies(cls, strategy_registry: Dict) -> None: - strategy_registry.register( - cls.strategy_name, - cls, - description=f"{cls.__class__.__name__}", - ) diff --git a/src/lightning/pytorch/trainer/configuration_validator.py b/src/lightning/pytorch/trainer/configuration_validator.py index 148168e5b6f38..c87b27b70966c 100644 --- a/src/lightning/pytorch/trainer/configuration_validator.py +++ b/src/lightning/pytorch/trainer/configuration_validator.py @@ -15,7 +15,6 @@ import lightning.pytorch as pl from lightning.fabric.utilities.warnings import PossibleUserWarning from lightning.pytorch.accelerators.ipu import IPUAccelerator -from lightning.pytorch.strategies import DataParallelStrategy from lightning.pytorch.trainer.states import TrainerFn from lightning.pytorch.utilities.exceptions import MisconfigurationException from lightning.pytorch.utilities.model_helpers import is_overridden @@ -120,18 +119,10 @@ def __verify_eval_loop_configuration(model: "pl.LightningModule", stage: str) -> def __verify_batch_transfer_support(trainer: "pl.Trainer") -> None: - """Raise Misconfiguration exception since these hooks are not supported in DP mode.""" batch_transfer_hooks = ("transfer_batch_to_device", "on_after_batch_transfer") datahook_selector = trainer._data_connector._datahook_selector assert datahook_selector is not None - for hook in batch_transfer_hooks: - # TODO: Remove this blocker once batch transfer to device is integrated in Lightning for DP mode. - if isinstance(trainer.strategy, DataParallelStrategy) and ( - is_overridden(hook, datahook_selector.model) or is_overridden(hook, datahook_selector.datamodule) - ): - raise MisconfigurationException(f"Overriding `{hook}` is not supported in DP mode.") - if isinstance(trainer.accelerator, IPUAccelerator) and ( is_overridden(hook, datahook_selector.model) or is_overridden(hook, datahook_selector.datamodule) ): diff --git a/src/lightning/pytorch/trainer/connectors/accelerator_connector.py b/src/lightning/pytorch/trainer/connectors/accelerator_connector.py index b8c42d74686f6..0f531cc09454b 100644 --- a/src/lightning/pytorch/trainer/connectors/accelerator_connector.py +++ b/src/lightning/pytorch/trainer/connectors/accelerator_connector.py @@ -227,9 +227,8 @@ def _check_config_and_set_final_flags( # MPS accelerator is incompatible with DDP family of strategies. It supports single-device operation only. is_ddp_str = isinstance(strategy, str) and "ddp" in strategy - is_dp_str = isinstance(strategy, str) and "dp" in strategy is_deepspeed_str = isinstance(strategy, str) and "deepspeed" in strategy - is_parallel_strategy = isinstance(strategy, ParallelStrategy) or is_ddp_str or is_dp_str or is_deepspeed_str + is_parallel_strategy = isinstance(strategy, ParallelStrategy) or is_ddp_str or is_deepspeed_str is_mps_accelerator = MPSAccelerator.is_available() and ( accelerator in ("mps", "auto", "gpu", None) or isinstance(accelerator, MPSAccelerator) ) @@ -482,9 +481,6 @@ def _check_strategy_and_fallback(self) -> None: or MPIEnvironment.detect() ): strategy_flag = "ddp" - if strategy_flag == "dp" and self._accelerator_flag == "cpu": - rank_zero_warn(f"{strategy_flag!r} is not supported on CPUs, hence setting `strategy='ddp'`.") - strategy_flag = "ddp" if ( strategy_flag in FSDPStrategy.get_registered_strategies() or isinstance(self._strategy_flag, FSDPStrategy) ) and self._accelerator_flag not in ("cuda", "gpu"): diff --git a/tests/tests_pytorch/models/test_amp.py b/tests/tests_pytorch/models/test_amp.py index 33f32c4765dca..d7c6922362141 100644 --- a/tests/tests_pytorch/models/test_amp.py +++ b/tests/tests_pytorch/models/test_amp.py @@ -82,7 +82,7 @@ def test_amp_cpus(tmpdir, strategy, precision, devices): trainer.predict(model) -@pytest.mark.parametrize("strategy", [None, "dp", "ddp_spawn"]) +@pytest.mark.parametrize("strategy", [None, "ddp_spawn"]) @pytest.mark.parametrize("precision", [16, pytest.param("bf16", marks=RunIf(bf16_cuda=True))]) @pytest.mark.parametrize( "devices", (pytest.param(1, marks=RunIf(min_cuda_gpus=1)), pytest.param(2, marks=RunIf(min_cuda_gpus=2))) diff --git a/tests/tests_pytorch/models/test_gpu.py b/tests/tests_pytorch/models/test_gpu.py index d66e92e07d524..b428eca521470 100644 --- a/tests/tests_pytorch/models/test_gpu.py +++ b/tests/tests_pytorch/models/test_gpu.py @@ -54,7 +54,6 @@ def test_multi_gpu_none_backend(tmpdir): @RunIf(min_cuda_gpus=2) @pytest.mark.parametrize("devices", [1, [0], [1]]) def test_single_gpu_model(tmpdir, devices): - """Make sure single GPU works (DP mode).""" trainer_options = dict( default_root_dir=tmpdir, enable_progress_bar=False, diff --git a/tests/tests_pytorch/models/test_restore.py b/tests/tests_pytorch/models/test_restore.py index 7e93189d397db..ed135a71458cb 100644 --- a/tests/tests_pytorch/models/test_restore.py +++ b/tests/tests_pytorch/models/test_restore.py @@ -21,7 +21,6 @@ import cloudpickle import pytest import torch -import torch.nn.functional as F from lightning_utilities.test.warning import no_warning_call from torch import Tensor @@ -84,27 +83,6 @@ class GenericValTestLossBoringModel(GenericParentValTestLossBoringModel[int]): pass -class CustomClassificationModelDP(ClassificationModel): - def _step(self, batch): - x, y = batch - logits = self(x) - return {"logits": logits, "y": y} - - def training_step(self, batch, batch_idx): - out = self._step(batch) - loss = F.cross_entropy(out["logits"], out["y"]) - return loss - - def validation_step(self, batch, batch_idx): - return self._step(batch) - - def test_step(self, batch, batch_idx): - return self._step(batch) - - def validation_step_end(self, outputs): - self.log("val_acc", self.valid_acc(outputs["logits"], outputs["y"])) - - def test_model_properties_fit_ckpt_path(tmpdir): """Test that properties like `current_epoch` and `global_step` in model and trainer are always the same.""" model = BoringModel() @@ -369,54 +347,6 @@ def test_callbacks_references_fit_ckpt_path(tmpdir): trainer.fit(model, datamodule=dm, ckpt_path=str(tmpdir / "last.ckpt")) -@RunIf(min_cuda_gpus=2, sklearn=True) -def test_running_test_pretrained_model_distrib_dp(tmpdir): - """Verify `test()` on pretrained model.""" - seed_everything(7) - - dm = ClassifDataModule() - model = CustomClassificationModelDP(lr=0.1) - - # exp file to get meta - logger = tutils.get_default_logger(tmpdir) - - # exp file to get weights - checkpoint = tutils.init_checkpoint_callback(logger) - - trainer_options = dict( - enable_progress_bar=False, - max_epochs=2, - limit_train_batches=5, - limit_val_batches=5, - callbacks=[checkpoint], - logger=logger, - accelerator="gpu", - devices=[0, 1], - strategy="dp", - default_root_dir=tmpdir, - ) - - # fit model - trainer = Trainer(**trainer_options) - trainer.fit(model, datamodule=dm) - - # correct result and ok accuracy - assert trainer.state.finished, f"Training failed with {trainer.state}" - pretrained_model = CustomClassificationModelDP.load_from_checkpoint(trainer.checkpoint_callback.best_model_path) - - # run test set - new_trainer = Trainer(**trainer_options) - new_trainer.test(pretrained_model, datamodule=dm) - pretrained_model.cpu() - - dataloaders = dm.test_dataloader() - if not isinstance(dataloaders, list): - dataloaders = [dataloaders] - - for dataloader in dataloaders: - tpipes.run_model_prediction(pretrained_model, dataloader) - - @RunIf(min_cuda_gpus=2, sklearn=True) def test_running_test_pretrained_model_distrib_ddp_spawn(tmpdir): """Verify `test()` on pretrained model.""" @@ -551,77 +481,6 @@ def test_load_model_from_checkpoint(tmpdir, model_template): new_trainer.test(pretrained_model) -@RunIf(min_cuda_gpus=2, sklearn=True) -def test_dp_resume(tmpdir): - """Make sure DP continues training correctly.""" - model = CustomClassificationModelDP(lr=0.1) - dm = ClassifDataModule() - - trainer_options = dict(max_epochs=1, accelerator="gpu", devices=2, strategy="dp", default_root_dir=tmpdir) - - # get logger - logger = tutils.get_default_logger(tmpdir) - - # exp file to get weights - # logger file to get weights - checkpoint = tutils.init_checkpoint_callback(logger) - - # add these to the trainer options - trainer_options["logger"] = logger - trainer_options["callbacks"] = [checkpoint] - - # fit model - trainer = Trainer(**trainer_options) - trainer.fit(model, datamodule=dm) - - # track epoch before saving - real_global_epoch = trainer.current_epoch - - # correct result and ok accuracy - assert trainer.state.finished, f"Training failed with {trainer.state}" - - # --------------------------- - # HPC LOAD/SAVE - # --------------------------- - # save - # save logger to make sure we get all the metrics - if logger: - logger.finalize("finished") - hpc_save_path = trainer._checkpoint_connector.hpc_save_path(tmpdir) - trainer.save_checkpoint(hpc_save_path) - - # init new trainer - new_logger = tutils.get_default_logger(tmpdir, version=logger.version) - trainer_options["logger"] = new_logger - trainer_options["callbacks"] = [ModelCheckpoint(dirpath=tmpdir)] - trainer_options["limit_train_batches"] = 0.5 - trainer_options["limit_val_batches"] = 0.2 - trainer_options["max_epochs"] = 1 - new_trainer = Trainer(**trainer_options) - - class CustomModel(CustomClassificationModelDP): - def __init__(self): - super().__init__() - self.on_train_start_called = False - - def on_train_start(self): - assert self.trainer.current_epoch == real_global_epoch and self.trainer.current_epoch > 0 - - def on_validation_start(self): - dataloader = dm.val_dataloader() - tpipes.run_model_prediction(self.trainer.lightning_module, dataloader=dataloader) - - # new model - model = CustomModel() - - # validate new model which should load hpc weights - new_trainer.validate(model, datamodule=dm, ckpt_path=hpc_save_path) - - # test freeze on gpu - model.freeze() - model.unfreeze() - - def test_model_saving_loading(tmpdir): """Tests use case where trainer saves the model, and user loads it from tags independently.""" model = BoringModel() diff --git a/tests/tests_pytorch/overrides/test_data_parallel.py b/tests/tests_pytorch/overrides/test_data_parallel.py deleted file mode 100644 index 86e45de64868f..0000000000000 --- a/tests/tests_pytorch/overrides/test_data_parallel.py +++ /dev/null @@ -1,205 +0,0 @@ -# Copyright The Lightning AI team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from unittest.mock import MagicMock, Mock - -import pytest -import torch -import torch.nn as nn -from torch.nn import DataParallel - -from lightning.pytorch import LightningModule -from lightning.pytorch.demos.boring_classes import BoringModel -from lightning.pytorch.overrides.base import _LightningModuleWrapperBase -from lightning.pytorch.overrides.data_parallel import ( - LightningParallelModule, - python_scalar_to_tensor, - unsqueeze_scalar_tensor, -) -from lightning.pytorch.trainer.states import RunningStage -from tests_pytorch.helpers.runif import RunIf - - -@pytest.mark.parametrize("wrapper_class", [LightningParallelModule, _LightningModuleWrapperBase]) -@pytest.mark.parametrize( - "stage", - [ - ("training", "training_step"), - ("testing", "test_step"), - ("validating", "validation_step"), - ("predicting", "predict_step"), - ], -) -def test_lightning_wrapper_module_methods(wrapper_class, stage): - """Test that the LightningWrapper redirects .forward() to the LightningModule methods.""" - pl_module = Mock(spec=LightningModule) - trainer = Mock() - pl_module._trainer = trainer - wrapped_module = wrapper_class(pl_module) - - batch = torch.rand(5) - batch_idx = 3 - - prop, step = stage - trainer.sanity_checking = False - - for p in ("training", "testing", "validating", "predicting"): - setattr(trainer, p, p == prop) - - wrapped_module(batch, batch_idx) - getattr(pl_module, step).assert_called_with(batch, batch_idx) - - -@pytest.mark.parametrize( - "inp,expected", - [ - [torch.tensor(1.0), torch.tensor([1.0])], - [torch.tensor([2.0]), torch.tensor([2.0])], - [torch.ones(3, 4, 5), torch.ones(3, 4, 5)], - ], -) -def test_unsqueeze_scalar_tensor(inp, expected): - """Test that the utility function unsqueezes only scalar tensors.""" - assert torch.all(unsqueeze_scalar_tensor(inp).eq(expected)) - - -@RunIf(min_cuda_gpus=2) -def test_lightning_parallel_module_unsqueeze_scalar(): - """Test that LightningParallelModule takes care of un-squeezeing 0-dim tensors.""" - - class TestModel(BoringModel): - def training_step(self, batch, batch_idx): - output = super().training_step(batch, batch_idx) - loss = output["loss"] - loss = loss.squeeze() - assert loss.dim() == 0 - # PyTorch usually warns about 0-dim tensors returned in DP - return {"loss": loss} - - model = TestModel() - trainer = MagicMock() - trainer.state.stage = RunningStage.TRAINING - trainer._accelerator_connector._init_deterministic(False) - - model.trainer = trainer - batch = torch.rand(2, 32).cuda() - batch_idx = 0 - - wrapped_model = LightningParallelModule(model).cuda() - dp_module = DataParallel(wrapped_model, device_ids=[0, 1]) - - output = wrapped_model(batch, batch_idx) - assert output["loss"].dim() == 1 - - with pytest.warns(None) as record: - output = dp_module(batch, batch_idx) - - assert output["loss"].dim() == 1 - assert not record - - -@pytest.mark.parametrize( - "inp,expected", [[1.0, torch.tensor([1.0])], [2, torch.tensor([2.0])], [True, torch.tensor([True])]] -) -def test_python_scalar_to_tensor(inp, expected): - assert torch.all(python_scalar_to_tensor(inp).eq(expected)) - - -@RunIf(min_cuda_gpus=1) -@pytest.mark.parametrize("device", [torch.device("cpu"), torch.device("cuda", 0)]) -def test_lightning_parallel_module_python_scalar_conversion(device): - """Test that LightningParallelModule can convert Python scalars to tensors.""" - - class TestModel(BoringModel): - def training_step(self, batch, batch_idx): - output = super().training_step(batch, batch_idx) - # PyTorch DP does not support Python scalars, Lightning converts them to tensors - output.update({"python scalar": 12.3}) - return output - - model = TestModel().to(device) - trainer = MagicMock() - trainer.state.stage = RunningStage.TRAINING - trainer._accelerator_connector._init_deterministic(False) - model.trainer = trainer - batch = torch.rand(2, 32).to(device) - batch_idx = 0 - - wrapped_model = LightningParallelModule(model) - output = wrapped_model(batch, batch_idx) - assert output["python scalar"] == torch.tensor([12.3], device=device) - - -@RunIf(min_cuda_gpus=2) -@pytest.mark.parametrize( - "nest, unnest", - [ - (lambda x: x, lambda x: x), - (lambda x: dict(data=x), lambda x: x["data"]), - (lambda x: [x, (x, x)], lambda x: x[1][0]), - ], -) -def test_lightning_parallel_module_device_access(nest, unnest): - """Test that self.device returns the correct value in replicas of DataParallel.""" - - class DeviceAccessModel(LightningModule): - def __init__(self): - super().__init__() - self.layer = nn.Linear(2, 3) - - def training_step(self, batch, batch_idx): - batch = unnest(batch) - assert batch.shape == torch.Size([1, 1]) - assert self.device.index == batch.item() - assert self.device == self.layer.weight.device - return torch.tensor(1, device=self.device) - - pl_module = DeviceAccessModel() - # required for redirecting the forward call to training_step - trainer = Mock() - pl_module.trainer = trainer - trainer.state.stage = RunningStage.TRAINING - - root_device = torch.device("cuda", 0) - wrapped_module = LightningParallelModule(pl_module).to(root_device) - model = DataParallel(wrapped_module, device_ids=[0, 1]) - - data = torch.tensor([0.0, 1.0], device=root_device).view(2, 1) # one value per gpu - data = data.to(root_device) - data = nest(data) - output = model(data, 0) - assert output.device == root_device - assert pl_module.device == root_device - assert torch.all(output.cpu().eq(torch.tensor([1, 1]))) - - -@RunIf(min_cuda_gpus=2) -def test_lightning_parallel_module_device_access_warning(): - """Test that we show a warning when the device can't be inferred from the input.""" - - class DeviceAccessModel(LightningModule): - def training_step(self, batch, batch_idx): - pass - - pl_module = DeviceAccessModel() - # required for redirecting the forward call to training_step - trainer = Mock() - pl_module.trainer = trainer - trainer.state.stage = RunningStage.TRAINING - - wrapped_module = LightningParallelModule(pl_module).cuda() - model = DataParallel(wrapped_module, device_ids=[0, 1]) - - data = dict(x=1) # contains no tensors - with pytest.warns(UserWarning, match="Could not determine on which device the inputs are."): - _ = model(data, 0) diff --git a/tests/tests_pytorch/strategies/test_common.py b/tests/tests_pytorch/strategies/test_common.py index 5f9f1bf6789a4..b14712d14b17d 100644 --- a/tests/tests_pytorch/strategies/test_common.py +++ b/tests/tests_pytorch/strategies/test_common.py @@ -18,14 +18,13 @@ from lightning.pytorch.demos.boring_classes import BoringModel from tests_pytorch.helpers.datamodules import ClassifDataModule from tests_pytorch.helpers.runif import RunIf -from tests_pytorch.strategies.test_dp import CustomClassificationModelDP +from tests_pytorch.helpers.simple_models import ClassificationModel @pytest.mark.parametrize( "trainer_kwargs", ( pytest.param(dict(accelerator="gpu", devices=1), marks=RunIf(min_cuda_gpus=1)), - pytest.param(dict(strategy="dp", accelerator="gpu", devices=2), marks=RunIf(min_cuda_gpus=2)), pytest.param(dict(strategy="ddp_spawn", accelerator="gpu", devices=2), marks=RunIf(min_cuda_gpus=2)), pytest.param(dict(accelerator="mps", devices=1), marks=RunIf(mps=True)), ), @@ -33,7 +32,7 @@ @RunIf(sklearn=True) def test_evaluate(tmpdir, trainer_kwargs): dm = ClassifDataModule() - model = CustomClassificationModelDP() + model = ClassificationModel() trainer = Trainer( default_root_dir=tmpdir, max_epochs=2, limit_train_batches=10, limit_val_batches=10, **trainer_kwargs ) diff --git a/tests/tests_pytorch/strategies/test_dp.py b/tests/tests_pytorch/strategies/test_dp.py deleted file mode 100644 index c1216a2613ff9..0000000000000 --- a/tests/tests_pytorch/strategies/test_dp.py +++ /dev/null @@ -1,205 +0,0 @@ -# Copyright The Lightning AI team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import torch -import torch.nn.functional as F -from torch.utils.data import DataLoader - -import lightning.pytorch as pl -import tests_pytorch.helpers.pipelines as tpipes -from lightning.pytorch.callbacks import EarlyStopping -from lightning.pytorch.demos.boring_classes import BoringModel, RandomDataset -from tests_pytorch.helpers.datamodules import ClassifDataModule -from tests_pytorch.helpers.runif import RunIf -from tests_pytorch.helpers.simple_models import ClassificationModel - - -class CustomClassificationModelDP(ClassificationModel): - def _step(self, batch, batch_idx): - x, y = batch - logits = self(x) - return {"logits": logits, "y": y} - - def training_step(self, batch, batch_idx): - out = self._step(batch, batch_idx) - loss = F.cross_entropy(out["logits"], out["y"]) - return loss - - def validation_step(self, batch, batch_idx): - return self._step(batch, batch_idx) - - def test_step(self, batch, batch_idx): - return self._step(batch, batch_idx) - - def validation_step_end(self, outputs): - self.log("val_acc", self.valid_acc(outputs["logits"], outputs["y"])) - - def test_step_end(self, outputs): - self.log("test_acc", self.test_acc(outputs["logits"], outputs["y"])) - - -@RunIf(min_cuda_gpus=2, sklearn=True) -def test_multi_gpu_early_stop_dp(tmpdir): - """Make sure DDP works. - - with early stopping - """ - dm = ClassifDataModule() - model = CustomClassificationModelDP() - - trainer_options = dict( - default_root_dir=tmpdir, - callbacks=[EarlyStopping(monitor="val_acc")], - max_epochs=50, - limit_train_batches=10, - limit_val_batches=10, - accelerator="gpu", - devices=[0, 1], - strategy="dp", - ) - - tpipes.run_model_test(trainer_options, model, dm) - - -@RunIf(min_cuda_gpus=2) -def test_multi_gpu_model_dp(tmpdir): - trainer_options = dict( - default_root_dir=tmpdir, - max_epochs=1, - limit_train_batches=10, - limit_val_batches=10, - accelerator="gpu", - devices=[0, 1], - strategy="dp", - enable_progress_bar=False, - ) - - model = BoringModel() - - tpipes.run_model_test(trainer_options, model) - - -class ReductionTestModel(BoringModel): - def __init__(self): - super().__init__() - self.train_outputs = [] - self.val_outputs = [] - self.test_outputs = [] - - def train_dataloader(self): - return DataLoader(RandomDataset(32, 64), batch_size=2) - - def val_dataloader(self): - return DataLoader(RandomDataset(32, 64), batch_size=2) - - def test_dataloader(self): - return DataLoader(RandomDataset(32, 64), batch_size=2) - - def add_outputs(self, output, device): - output.update( - { - "reduce_int": torch.tensor(device.index, dtype=torch.int, device=device), - "reduce_float": torch.tensor(device.index, dtype=torch.float, device=device), - } - ) - - def training_step(self, batch, batch_idx): - output = super().training_step(batch, batch_idx) - self.add_outputs(output, batch.device) - return output - - def validation_step(self, batch, batch_idx): - output = super().validation_step(batch, batch_idx) - self.add_outputs(output, batch.device) - return output - - def test_step(self, batch, batch_idx): - output = super().test_step(batch, batch_idx) - self.add_outputs(output, batch.device) - return output - - def training_step_end(self, training_step_output): - # the strategy does this automatically, but since we want to store these in memory, we need to manually do it - # so that we can append the reduced value and not the per-rank value - training_step_output["loss"] = self.trainer.strategy.reduce(training_step_output["loss"]) - self.train_outputs.append(training_step_output) - # return this or the DP strategy will reduce again - return training_step_output - - def validation_step_end(self, validation_step_output): - self.val_outputs.append(validation_step_output) - # returning a value is not necessary because there's no modification - - def test_step_end(self, test_step_output): - self.test_outputs.append(test_step_output) - - def on_train_epoch_end(self): - assert self.train_outputs[0]["loss"].shape == torch.Size([]) - self._assert_extra_outputs(self.train_outputs) - - def on_validation_epoch_end(self): - assert self.val_outputs[0]["x"].shape == torch.Size([2]) - self._assert_extra_outputs(self.val_outputs) - - def on_test_epoch_end(self): - assert self.test_outputs[0]["y"].shape == torch.Size([2]) - self._assert_extra_outputs(self.test_outputs) - - def _assert_extra_outputs(self, outputs): - out = outputs[0]["reduce_int"] - assert torch.eq(out, torch.tensor([0, 1], device="cuda:0")).all() - assert out.dtype is torch.int - - out = outputs[0]["reduce_float"] - assert torch.eq(out, torch.tensor([0.0, 1.0], device="cuda:0")).all() - assert out.dtype is torch.float - - -@RunIf(min_cuda_gpus=2) -def test_dp_training_step_dict(tmpdir): - """This test verifies that dp properly reduces dictionaries.""" - model = ReductionTestModel() - - trainer = pl.Trainer( - default_root_dir=tmpdir, - fast_dev_run=True, - accelerator="gpu", - devices=2, - strategy="dp", - ) - trainer.fit(model) - trainer.test(model) - - -@RunIf(min_cuda_gpus=2) -def test_dp_batch_not_moved_to_device_explicitly(tmpdir): - """Test that with DP, batch is not moved to the device explicitly.""" - - class CustomModel(BoringModel): - def on_train_batch_start(self, batch, *args, **kargs): - assert not batch.is_cuda - - def training_step(self, batch, batch_idx): - assert batch.is_cuda - return super().training_step(batch, batch_idx) - - trainer = pl.Trainer( - default_root_dir=tmpdir, - fast_dev_run=True, - accelerator="gpu", - devices=2, - strategy="dp", - ) - - trainer.fit(CustomModel()) diff --git a/tests/tests_pytorch/trainer/connectors/test_accelerator_connector.py b/tests/tests_pytorch/trainer/connectors/test_accelerator_connector.py index 55397e708d6fe..607487696f11b 100644 --- a/tests/tests_pytorch/trainer/connectors/test_accelerator_connector.py +++ b/tests/tests_pytorch/trainer/connectors/test_accelerator_connector.py @@ -37,7 +37,6 @@ from lightning.pytorch.plugins import DoublePrecisionPlugin, LayerSync, PrecisionPlugin, TorchSyncBatchNorm from lightning.pytorch.plugins.io import TorchCheckpointIO from lightning.pytorch.strategies import ( - DataParallelStrategy, DDPSpawnStrategy, DDPStrategy, DeepSpeedStrategy, @@ -246,16 +245,6 @@ def test_interactive_incompatible_backend_error(cuda_count_2, monkeypatch): with pytest.raises(MisconfigurationException, match=r"strategy='ddp_spawn'\)`.*is not compatible"): Trainer(strategy="ddp_spawn", accelerator="gpu", devices=2) - with pytest.raises(MisconfigurationException, match=r"strategy='ddp'\)`.*is not compatible"): - # Edge case: AcceleratorConnector maps dp to ddp if accelerator != gpu - Trainer(strategy="dp") - - -def test_interactive_compatible_dp_strategy_gpu(mps_count_0, cuda_count_2, monkeypatch): - monkeypatch.setattr(lightning.pytorch.trainer.connectors.accelerator_connector, "_IS_INTERACTIVE", True) - trainer = Trainer(strategy="dp", accelerator="gpu") - assert trainer.strategy.launcher is None - @RunIf(skip_windows=True) def test_interactive_compatible_strategy_tpu(tpu_available, monkeypatch): @@ -351,12 +340,6 @@ def test_set_devices_if_none_cpu(): assert trainer.num_devices == 3 -def test_unsupported_strategy_types_on_cpu_and_fallback(): - with pytest.warns(UserWarning, match="is not supported on CPUs, hence setting `strategy='ddp"): - trainer = Trainer(accelerator="cpu", strategy="dp", devices=2) - assert isinstance(trainer.strategy, DDPStrategy) - - @pytest.mark.parametrize( ["strategy", "strategy_class"], ( @@ -366,7 +349,6 @@ def test_unsupported_strategy_types_on_cpu_and_fallback(): ("ddp", DDPStrategy), ("ddp_find_unused_parameters_false", DDPStrategy), ("ddp_find_unused_parameters_true", DDPStrategy), - ("dp", DataParallelStrategy), pytest.param("deepspeed", DeepSpeedStrategy, marks=RunIf(deepspeed=True)), ), ) @@ -406,7 +388,6 @@ def test_strategy_choice_cpu_instance(strategy_class): ("ddp_spawn_find_unused_parameters_false", DDPSpawnStrategy), ("ddp", DDPStrategy), ("ddp_find_unused_parameters_false", DDPStrategy), - ("dp", DataParallelStrategy), pytest.param("deepspeed", DeepSpeedStrategy, marks=RunIf(deepspeed=True)), ], ) diff --git a/tests/tests_pytorch/trainer/logging_/test_logger_connector.py b/tests/tests_pytorch/trainer/logging_/test_logger_connector.py index 73caa92ae7f54..fda6dc3f21331 100644 --- a/tests/tests_pytorch/trainer/logging_/test_logger_connector.py +++ b/tests/tests_pytorch/trainer/logging_/test_logger_connector.py @@ -31,7 +31,6 @@ from lightning.pytorch.trainer.connectors.logger_connector.result import _ResultCollection from lightning.pytorch.utilities.exceptions import MisconfigurationException from lightning.pytorch.utilities.imports import _TORCHMETRICS_GREATER_EQUAL_0_9_1 -from tests_pytorch.helpers.runif import RunIf from tests_pytorch.models.test_hooks import get_members @@ -261,63 +260,6 @@ def test_fx_validator_integration(tmpdir): trainer.predict(model) -@RunIf(min_cuda_gpus=2) -def test_epoch_results_cache_dp(tmpdir): - - root_device = torch.device("cuda", 0) - - class TestModel(BoringModel): - def training_step(self, *args, **kwargs): - result = super().training_step(*args, **kwargs) - self.log("train_loss_epoch", result["loss"], on_step=False, on_epoch=True) - return result - - def training_step_end(self, training_step_output): # required for dp - loss = training_step_output["loss"].mean() - return loss - - def on_train_epoch_end(self): - assert self.trainer.callback_metrics["train_loss_epoch"].device == root_device - - def validation_step(self, *args, **kwargs): - val_loss = torch.rand(1, device=torch.device("cuda", 1)) - self.log("val_loss_epoch", val_loss, on_step=False, on_epoch=True) - return val_loss - - def on_validation_epoch_end(self): - assert self.trainer.callback_metrics["val_loss_epoch"].device == root_device - - def test_step(self, *args, **kwargs): - test_loss = torch.rand(1, device=torch.device("cuda", 1)) - self.log("test_loss_epoch", test_loss, on_step=False, on_epoch=True) - return test_loss - - def on_test_epoch_end(self): - assert self.trainer.callback_metrics["test_loss_epoch"].device == root_device - - def train_dataloader(self): - return DataLoader(RandomDataset(32, 64), batch_size=4) - - def val_dataloader(self): - return DataLoader(RandomDataset(32, 64), batch_size=4) - - def test_dataloader(self): - return DataLoader(RandomDataset(32, 64), batch_size=4) - - model = TestModel() - trainer = Trainer( - default_root_dir=tmpdir, - strategy="dp", - accelerator="gpu", - devices=2, - limit_train_batches=2, - limit_val_batches=2, - max_epochs=1, - ) - trainer.fit(model) - trainer.test(model) - - @pytest.mark.parametrize("add_dataloader_idx", [False, True]) def test_auto_add_dataloader_idx(tmpdir, add_dataloader_idx): """test that auto_add_dataloader_idx argument works.""" diff --git a/tests/tests_pytorch/trainer/properties/test_estimated_stepping_batches.py b/tests/tests_pytorch/trainer/properties/test_estimated_stepping_batches.py index b65900a213fa9..f3e5c6daf3b3d 100644 --- a/tests/tests_pytorch/trainer/properties/test_estimated_stepping_batches.py +++ b/tests/tests_pytorch/trainer/properties/test_estimated_stepping_batches.py @@ -111,7 +111,6 @@ def test_num_stepping_batches_accumulate_gradients(accumulate_grad_batches, expe ({"strategy": "ddp", "num_nodes": 2}, 5), ({"strategy": "ddp", "num_nodes": 3}, 4), ({"strategy": "ddp", "num_nodes": 4}, 3), - ({"strategy": "dp"}, 64), ], ) def test_num_stepping_batches_gpu(trainer_kwargs, estimated_steps, monkeypatch): diff --git a/tests/tests_pytorch/trainer/test_config_validator.py b/tests/tests_pytorch/trainer/test_config_validator.py index d25e4f20ee14c..24425992b9dd5 100644 --- a/tests/tests_pytorch/trainer/test_config_validator.py +++ b/tests/tests_pytorch/trainer/test_config_validator.py @@ -25,7 +25,6 @@ __verify_train_val_loop_configuration, ) from lightning.pytorch.utilities.exceptions import MisconfigurationException -from tests_pytorch.conftest import mock_cuda_count def test_wrong_train_setting(tmpdir): @@ -141,14 +140,11 @@ def test_trainer_manual_optimization_config(): trainer.fit(model) -@pytest.mark.parametrize("trainer_kwargs", [{"accelerator": "ipu"}, {"accelerator": "gpu", "strategy": "dp"}]) +@pytest.mark.parametrize("trainer_kwargs", [{"accelerator": "ipu"}]) @pytest.mark.parametrize("hook", ["transfer_batch_to_device", "on_after_batch_transfer"]) def test_raise_exception_with_batch_transfer_hooks(monkeypatch, hook, trainer_kwargs, tmpdir): """Test that an exception is raised when overriding batch_transfer_hooks.""" - if trainer_kwargs.get("accelerator") == "gpu": - match_pattern = rf"Overriding `{hook}` is not .* in DP mode." - mock_cuda_count(monkeypatch, 2) - elif trainer_kwargs.get("accelerator") == "ipu": + if trainer_kwargs.get("accelerator") == "ipu": match_pattern = rf"Overriding `{hook}` is not .* with IPUs" monkeypatch.setattr(pl.accelerators.ipu.IPUAccelerator, "is_available", lambda: True) monkeypatch.setattr(pl.strategies.ipu, "_IPU_AVAILABLE", lambda: True) diff --git a/tests/tests_pytorch/trainer/test_trainer.py b/tests/tests_pytorch/trainer/test_trainer.py index 4876af26b9d11..8d7b044d24a12 100644 --- a/tests/tests_pytorch/trainer/test_trainer.py +++ b/tests/tests_pytorch/trainer/test_trainer.py @@ -50,7 +50,7 @@ ) from lightning.pytorch.loggers import TensorBoardLogger from lightning.pytorch.overrides.distributed import IndexBatchSamplerWrapper, UnrepeatedDistributedSampler -from lightning.pytorch.strategies import DataParallelStrategy, DDPSpawnStrategy, DDPStrategy, SingleDeviceStrategy +from lightning.pytorch.strategies import DDPSpawnStrategy, DDPStrategy, SingleDeviceStrategy from lightning.pytorch.trainer.states import RunningStage, TrainerFn from lightning.pytorch.utilities.exceptions import MisconfigurationException from lightning.pytorch.utilities.imports import _OMEGACONF_AVAILABLE @@ -1389,8 +1389,6 @@ def test_trainer_predict_cpu(tmpdir, datamodule, enable_progress_bar): @pytest.mark.parametrize( "kwargs", [ - {"strategy": "dp", "devices": 1}, - {"strategy": "dp", "devices": 2}, {"strategy": "ddp", "devices": 2}, ], ) @@ -1883,7 +1881,6 @@ def training_step(self, batch, batch_idx): ["trainer_kwargs", "strategy_cls", "strategy_name", "accelerator_cls", "devices"], [ ({"strategy": None}, SingleDeviceStrategy, "single_device", CPUAccelerator, 1), - pytest.param({"strategy": "dp"}, DDPStrategy, "ddp", CPUAccelerator, 1, marks=RunIf(mps=False)), pytest.param({"strategy": "ddp"}, DDPStrategy, "ddp", CPUAccelerator, 1, marks=RunIf(mps=False)), pytest.param( {"strategy": "ddp", "num_nodes": 2}, DDPStrategy, "ddp", CPUAccelerator, 1, marks=RunIf(mps=False) @@ -1895,7 +1892,6 @@ def training_step(self, batch, batch_idx): CUDAAccelerator, 1, ), - ({"strategy": "dp", "accelerator": "cuda", "devices": 1}, DataParallelStrategy, "dp", CUDAAccelerator, 1), ({"strategy": "ddp", "accelerator": "cuda", "devices": 1}, DDPStrategy, "ddp", CUDAAccelerator, 1), ( {"strategy": "ddp_spawn", "accelerator": "cuda", "devices": 1}, @@ -1905,7 +1901,6 @@ def training_step(self, batch, batch_idx): 1, ), ({"strategy": None, "accelerator": "cuda", "devices": 2}, DDPSpawnStrategy, "ddp_spawn", CUDAAccelerator, 2), - ({"strategy": "dp", "accelerator": "cuda", "devices": 2}, DataParallelStrategy, "dp", CUDAAccelerator, 2), ({"strategy": "ddp", "accelerator": "cuda", "devices": 2}, DDPStrategy, "ddp", CUDAAccelerator, 2), ({"strategy": "ddp", "accelerator": "cpu", "devices": 2}, DDPStrategy, "ddp", CPUAccelerator, 2), ( @@ -1938,13 +1933,6 @@ def training_step(self, batch, batch_idx): ), pytest.param({"strategy": DDPStrategy()}, DDPStrategy, "ddp", CPUAccelerator, 1, marks=RunIf(mps=False)), ({"strategy": DDPStrategy(), "accelerator": "cuda", "devices": 2}, DDPStrategy, "ddp", CUDAAccelerator, 2), - ( - {"strategy": DataParallelStrategy(), "accelerator": "cuda", "devices": 2}, - DataParallelStrategy, - "dp", - CUDAAccelerator, - 2, - ), ( {"strategy": "ddp_spawn", "accelerator": "cuda", "devices": 2, "num_nodes": 2}, DDPSpawnStrategy, diff --git a/tests/tests_pytorch/utilities/test_compile.py b/tests/tests_pytorch/utilities/test_compile.py index fdd39ef1027d7..f0da1cb289f82 100644 --- a/tests/tests_pytorch/utilities/test_compile.py +++ b/tests/tests_pytorch/utilities/test_compile.py @@ -14,6 +14,7 @@ import pytest import torch +from lightning_utilities.core import module_available from lightning.pytorch import LightningModule, Trainer from lightning.pytorch.demos.boring_classes import BoringModel @@ -58,11 +59,12 @@ def test_trainer_compiled_model(tmp_path, monkeypatch): assert trainer.model._compiler_ctx is None # some strategies do not support it - compiled_model = torch.compile(model) - mock_cuda_count(monkeypatch, 1) - trainer = Trainer(strategy="dp", accelerator="cuda", **trainer_kwargs) - with pytest.raises(RuntimeError, match="Using a compiled model is incompatible with the current strategy.*"): - trainer.fit(compiled_model) + if module_available("deepspeed"): + compiled_model = torch.compile(model) + mock_cuda_count(monkeypatch, 2) + trainer = Trainer(strategy="deepspeed", accelerator="cuda", **trainer_kwargs) + with pytest.raises(RuntimeError, match="Using a compiled model is incompatible with the current strategy.*"): + trainer.fit(compiled_model) # ddp does trainer = Trainer(strategy="ddp", **trainer_kwargs) diff --git a/tests/tests_pytorch/utilities/test_dtype_device_mixin.py b/tests/tests_pytorch/utilities/test_dtype_device_mixin.py index 36ac3a25f12ff..c35261944f8ce 100644 --- a/tests/tests_pytorch/utilities/test_dtype_device_mixin.py +++ b/tests/tests_pytorch/utilities/test_dtype_device_mixin.py @@ -44,20 +44,6 @@ def on_train_batch_start(self, trainer, model, batch, batch_idx): assert model.device == model.module.module.device -@RunIf(min_cuda_gpus=2) -def test_submodules_multi_gpu_dp(tmpdir): - model = TopModule() - trainer = Trainer( - default_root_dir=tmpdir, - strategy="dp", - accelerator="gpu", - devices=2, - callbacks=[DeviceAssertCallback()], - max_steps=1, - ) - trainer.fit(model) - - @RunIf(min_cuda_gpus=2) def test_submodules_multi_gpu_ddp_spawn(tmpdir): model = TopModule()