Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/lightning_lite/strategies/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from lightning_lite.plugins.precision.utils import _fp_to_half
from lightning_lite.strategies.ddp import DDPStrategy
from lightning_lite.utilities.apply_func import apply_to_collection
from lightning_lite.utilities.distributed import get_default_process_group_backend_for_device, log
from lightning_lite.utilities.distributed import log
from lightning_lite.utilities.enums import AMPType, PrecisionType
from lightning_lite.utilities.rank_zero import rank_zero_info
from lightning_lite.utilities.seed import reset_seed
Expand Down Expand Up @@ -450,9 +450,6 @@ def _init_deepspeed_distributed(self) -> None:
self._process_group_backend = self._get_process_group_backend()
deepspeed.init_distributed(self._process_group_backend, distributed_port=self.cluster_environment.main_port)

def _get_process_group_backend(self) -> str:
return self._process_group_backend or get_default_process_group_backend_for_device(self.root_device)

def _set_node_environment_variables(self) -> None:
assert self.cluster_environment is not None
os.environ["MASTER_ADDR"] = self.cluster_environment.main_address
Expand Down
12 changes: 0 additions & 12 deletions src/lightning_lite/utilities/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Any, List, Optional, Tuple, Union

import torch
from lightning_utilities.core.rank_zero import rank_zero_deprecation
from torch import Tensor
from torch.nn import functional as F

Expand Down Expand Up @@ -251,14 +250,3 @@ def tpu_distributed() -> bool:

def get_default_process_group_backend_for_device(device: torch.device) -> str:
return "nccl" if device.type == "cuda" else "gloo"


def _get_process_group_backend_from_env() -> Optional[str]:
torch_backend = os.getenv("PL_TORCH_DISTRIBUTED_BACKEND")
if torch_backend is not None:
rank_zero_deprecation(
"Environment variable `PL_TORCH_DISTRIBUTED_BACKEND`"
" was deprecated in v1.6 and will be removed in v1.8."
" Specify `process_group_backend` directly on the strategy constructor."
)
return torch_backend
4 changes: 4 additions & 0 deletions src/pytorch_lightning/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Removed the deprecated `BaseProfiler` and `AbstractProfiler` classes ([#14404](https://github.com/Lightning-AI/lightning/pull/14404))


- Removed the deprecated way to set the distributed backend via the environment variable `PL_TORCH_DISTRIBUTED_BACKEND`, in favor of setting the `process_group_backend` in the strategy constructor ([#14693](https://github.com/Lightning-AI/lightning/pull/14693))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally would have kept it but otherwise, LGTM !




### Fixed

- Break HPU Graphs into two parts (forward + backward as one and optimizer as another) for better performance ([#14656](https://github.com/Lightning-AI/lightning/pull/14656))
Expand Down
12 changes: 2 additions & 10 deletions src/pytorch_lightning/strategies/ddp.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@
import pytorch_lightning as pl
from lightning_lite.plugins.environments.cluster_environment import ClusterEnvironment
from lightning_lite.plugins.io.checkpoint_plugin import CheckpointIO
from lightning_lite.utilities.distributed import (
_get_process_group_backend_from_env,
distributed_available,
get_default_process_group_backend_for_device,
)
from lightning_lite.utilities.distributed import distributed_available, get_default_process_group_backend_for_device
from lightning_lite.utilities.distributed import group as _group
from lightning_lite.utilities.distributed import init_dist_connection, ReduceOp, sync_ddp_if_available
from lightning_lite.utilities.optimizer import optimizers_to_device
Expand Down Expand Up @@ -213,11 +209,7 @@ def setup_distributed(self) -> None:
init_dist_connection(self.cluster_environment, self._process_group_backend, timeout=self._timeout)

def _get_process_group_backend(self) -> str:
return (
self._process_group_backend
or _get_process_group_backend_from_env()
or get_default_process_group_backend_for_device(self.root_device)
)
return self._process_group_backend or get_default_process_group_backend_for_device(self.root_device)

def set_world_ranks(self) -> None:
if self.cluster_environment is None:
Expand Down
12 changes: 2 additions & 10 deletions src/pytorch_lightning/strategies/ddp_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@
import pytorch_lightning as pl
from lightning_lite.plugins.environments.cluster_environment import ClusterEnvironment
from lightning_lite.plugins.io.checkpoint_plugin import CheckpointIO
from lightning_lite.utilities.distributed import (
_get_process_group_backend_from_env,
distributed_available,
get_default_process_group_backend_for_device,
)
from lightning_lite.utilities.distributed import distributed_available, get_default_process_group_backend_for_device
from lightning_lite.utilities.distributed import group as _group
from lightning_lite.utilities.distributed import init_dist_connection, ReduceOp, sync_ddp_if_available
from lightning_lite.utilities.optimizer import optimizers_to_device
Expand Down Expand Up @@ -187,11 +183,7 @@ def _worker_setup(self, process_idx: int) -> None:
)

def _get_process_group_backend(self) -> str:
return (
self._process_group_backend
or _get_process_group_backend_from_env()
or get_default_process_group_backend_for_device(self.root_device)
)
return self._process_group_backend or get_default_process_group_backend_for_device(self.root_device)

def pre_configure_ddp(self) -> None:
# if unset, default `find_unused_parameters` `True`
Expand Down
13 changes: 1 addition & 12 deletions src/pytorch_lightning/strategies/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@
import pytorch_lightning as pl
from lightning_lite.plugins.environments.cluster_environment import ClusterEnvironment
from lightning_lite.plugins.precision.utils import _fp_to_half
from lightning_lite.utilities.distributed import (
_get_process_group_backend_from_env,
get_default_process_group_backend_for_device,
log,
)
from lightning_lite.utilities.enums import AMPType, PrecisionType
from lightning_lite.utilities.optimizer import optimizers_to_device
from lightning_lite.utilities.seed import reset_seed
Expand All @@ -53,6 +48,7 @@
from pytorch_lightning.utilities.rank_zero import rank_zero_deprecation, rank_zero_info, rank_zero_warn
from pytorch_lightning.utilities.types import LRSchedulerConfig, STEP_OUTPUT

log = logging.getLogger(__name__)
warning_cache = WarningCache()

_DEEPSPEED_AVAILABLE = RequirementCache("deepspeed")
Expand Down Expand Up @@ -395,13 +391,6 @@ def _init_deepspeed_distributed(self) -> None:
self._process_group_backend = self._get_process_group_backend()
deepspeed.init_distributed(self._process_group_backend, distributed_port=self.cluster_environment.main_port)

def _get_process_group_backend(self) -> str:
return (
self._process_group_backend
or _get_process_group_backend_from_env()
or get_default_process_group_backend_for_device(self.root_device)
)

def _set_node_environment_variables(self) -> None:
assert self.cluster_environment is not None
os.environ["MASTER_ADDR"] = self.cluster_environment.main_address
Expand Down
11 changes: 2 additions & 9 deletions src/pytorch_lightning/strategies/fully_sharded_native.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import pytorch_lightning as pl
from lightning_lite.plugins.environments.cluster_environment import ClusterEnvironment
from lightning_lite.plugins.io.checkpoint_plugin import CheckpointIO
from lightning_lite.utilities.distributed import (
_get_process_group_backend_from_env,
get_default_process_group_backend_for_device,
)
from lightning_lite.utilities.distributed import get_default_process_group_backend_for_device
from lightning_lite.utilities.distributed import group as _group
from lightning_lite.utilities.distributed import init_dist_connection, ReduceOp, sync_ddp_if_available
from lightning_lite.utilities.optimizer import optimizers_to_device
Expand Down Expand Up @@ -188,11 +185,7 @@ def setup_environment(self) -> None:
super().setup_environment()

def _get_process_group_backend(self) -> str:
return (
self._process_group_backend
or _get_process_group_backend_from_env()
or get_default_process_group_backend_for_device(self.root_device)
)
return self._process_group_backend or get_default_process_group_backend_for_device(self.root_device)

def set_world_ranks(self) -> None:
if self.cluster_environment is None:
Expand Down
19 changes: 1 addition & 18 deletions src/pytorch_lightning/strategies/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,10 @@
import pytorch_lightning as pl
from lightning_lite.plugins.environments.cluster_environment import ClusterEnvironment
from lightning_lite.plugins.io.checkpoint_plugin import CheckpointIO
from lightning_lite.utilities.distributed import (
_get_process_group_backend_from_env,
all_gather_ddp_if_available,
get_default_process_group_backend_for_device,
ReduceOp,
)
from lightning_lite.utilities.distributed import all_gather_ddp_if_available, ReduceOp
from pytorch_lightning.plugins import LayerSync
from pytorch_lightning.plugins.precision import PrecisionPlugin
from pytorch_lightning.strategies.strategy import Strategy
from pytorch_lightning.utilities.rank_zero import rank_zero_deprecation


class ParallelStrategy(Strategy, ABC):
Expand Down Expand Up @@ -89,17 +83,6 @@ def distributed_sampler_kwargs(self) -> Dict[str, Any]:
)
return distributed_sampler_kwargs

@property
def torch_distributed_backend(self) -> str:
"""Deprecated property."""
rank_zero_deprecation(
"ParallelStrategy.torch_distributed_backend was deprecated in v1.6 and will be removed in v1.8."
)
pg_backend = _get_process_group_backend_from_env()
if pg_backend:
return pg_backend
return get_default_process_group_backend_for_device(self.root_device)

def reconciliate_processes(self, trace: str) -> None:
"""Function to re-conciliate processes on failure."""

Expand Down
46 changes: 0 additions & 46 deletions tests/tests_pytorch/deprecated_api/test_remove_1-8.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test deprecated functionality which will be removed in v1.8.0."""
import os
import time
from unittest import mock
from unittest.mock import Mock

import numpy as np
import pytest
import torch

import pytorch_lightning
from lightning_lite.utilities import device_parser
Expand All @@ -29,7 +27,6 @@
from pytorch_lightning.loggers import CSVLogger, Logger
from pytorch_lightning.plugins.precision.precision_plugin import PrecisionPlugin
from pytorch_lightning.profilers import AdvancedProfiler, SimpleProfiler
from pytorch_lightning.strategies import ParallelStrategy
from pytorch_lightning.strategies.ipu import LightningIPUModule
from pytorch_lightning.trainer.configuration_validator import _check_datamodule_checkpoint_hooks
from pytorch_lightning.trainer.states import RunningStage
Expand Down Expand Up @@ -509,49 +506,6 @@ def test_v1_8_0_lightning_module_use_amp():
model.use_amp = False


@mock.patch.dict(os.environ, {"PL_TORCH_DISTRIBUTED_BACKEND": "foo"})
def test_v1_8_0_torch_distributed_backend_env():
from lightning_lite.utilities.distributed import _get_process_group_backend_from_env

with pytest.deprecated_call(
match="Environment variable `PL_TORCH_DISTRIBUTED_BACKEND`"
" was deprecated in v1.6 and will be removed in v1.8."
):
_get_process_group_backend_from_env()


def test_parallel_strategy_torch_distributed_backend():
class CustomParallel(ParallelStrategy):
@property
def root_device(self) -> torch.device:
return torch.device("cpu")

def model_to_device(self):
pass

@property
def is_global_zero(self):
return True

def broadcast(self, obj):
return obj

def reduce(self, tensor):
return tensor

def barrier(self):
return

def all_gather(self, tensor):
return tensor

strategy = CustomParallel()
with pytest.deprecated_call(
match="ParallelStrategy.torch_distributed_backend was deprecated" " in v1.6 and will be removed in v1.8."
):
strategy.torch_distributed_backend


def test_trainer_config_device_ids():
trainer = Trainer(devices=2)
with pytest.deprecated_call(
Expand Down
70 changes: 30 additions & 40 deletions tests/tests_pytorch/strategies/test_ddp.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.
import os
from unittest import mock
from unittest.mock import patch

import pytest
import torch
Expand Down Expand Up @@ -59,34 +58,29 @@ def test_multi_gpu_model_ddp_fit_test(tmpdir):


@RunIf(skip_windows=True)
@pytest.mark.skipif(torch.cuda.is_available(), reason="test doesn't requires GPU machine")
@mock.patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0,1", "WORLD_SIZE": "2"}, clear=True)
@mock.patch("lightning_lite.utilities.device_parser.is_cuda_available", return_value=True)
def test_torch_distributed_backend_env_variables(tmpdir):
@mock.patch("lightning_lite.utilities.device_parser.num_cuda_devices", return_value=2)
def test_torch_distributed_backend_invalid(_, __, tmpdir):
"""This test set `undefined` as torch backend and should raise an `Backend.UNDEFINED` ValueError."""
_environ = {"PL_TORCH_DISTRIBUTED_BACKEND": "undefined", "CUDA_VISIBLE_DEVICES": "0,1", "WORLD_SIZE": "2"}
with patch.dict(os.environ, _environ), patch(
"lightning_lite.utilities.device_parser.num_cuda_devices", return_value=2
):
with pytest.deprecated_call(match="Environment variable `PL_TORCH_DISTRIBUTED_BACKEND` was deprecated in v1.6"):
with pytest.raises(ValueError, match="Invalid backend: 'undefined'"):
model = BoringModel()
trainer = Trainer(
default_root_dir=tmpdir,
fast_dev_run=True,
strategy="ddp",
accelerator="gpu",
devices=2,
logger=False,
)
trainer.fit(model)
model = BoringModel()
trainer = Trainer(
default_root_dir=tmpdir,
fast_dev_run=True,
strategy=DDPStrategy(process_group_backend="undefined"),
accelerator="cuda",
devices=2,
logger=False,
)
with pytest.raises(ValueError, match="Invalid backend: 'undefined'"):
trainer.fit(model)


@RunIf(skip_windows=True)
@mock.patch("torch.cuda.set_device")
@mock.patch("lightning_lite.utilities.device_parser.is_cuda_available", return_value=True)
@mock.patch("lightning_lite.utilities.device_parser.num_cuda_devices", return_value=1)
@mock.patch("pytorch_lightning.accelerators.gpu.CUDAAccelerator.is_available", return_value=True)
@mock.patch.dict(os.environ, {"PL_TORCH_DISTRIBUTED_BACKEND": "gloo"}, clear=True)
def test_ddp_torch_dist_is_available_in_setup(
mock_gpu_is_available, mock_device_count, mock_cuda_available, mock_set_device, tmpdir
):
Expand All @@ -98,10 +92,15 @@ def setup(self, stage: str) -> None:
raise SystemExit()

model = TestModel()
trainer = Trainer(default_root_dir=tmpdir, fast_dev_run=True, strategy="ddp", accelerator="gpu", devices=1)
with pytest.deprecated_call(match="Environment variable `PL_TORCH_DISTRIBUTED_BACKEND` was deprecated in v1.6"):
with pytest.raises(SystemExit):
trainer.fit(model)
trainer = Trainer(
default_root_dir=tmpdir,
fast_dev_run=True,
strategy=DDPStrategy(process_group_backend="gloo"),
accelerator="gpu",
devices=1,
)
with pytest.raises(SystemExit):
trainer.fit(model)


@RunIf(min_cuda_gpus=2, min_torch="1.8.1", standalone=True)
Expand Down Expand Up @@ -143,17 +142,15 @@ def on_train_start(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule")


@pytest.mark.parametrize(
["process_group_backend", "env_var", "device_str", "expected_process_group_backend"],
["process_group_backend", "device_str", "expected_process_group_backend"],
[
pytest.param("foo", None, "cpu", "foo"),
pytest.param("foo", "BAR", "cpu", "foo"),
pytest.param("foo", "BAR", "cuda:0", "foo"),
pytest.param(None, "BAR", "cuda:0", "BAR"),
pytest.param(None, None, "cuda:0", "nccl"),
pytest.param(None, None, "cpu", "gloo"),
pytest.param("foo", "cpu", "foo"),
pytest.param("foo", "cuda:0", "foo"),
pytest.param(None, "cuda:0", "nccl"),
pytest.param(None, "cpu", "gloo"),
],
)
def test_ddp_process_group_backend(process_group_backend, env_var, device_str, expected_process_group_backend):
def test_ddp_process_group_backend(process_group_backend, device_str, expected_process_group_backend):
"""Test settings for process group backend."""

class MockDDPStrategy(DDPStrategy):
Expand All @@ -166,14 +163,7 @@ def root_device(self):
return self._root_device

strategy = MockDDPStrategy(process_group_backend=process_group_backend, root_device=torch.device(device_str))
if not process_group_backend and env_var:
with mock.patch.dict(os.environ, {"PL_TORCH_DISTRIBUTED_BACKEND": env_var}):
with pytest.deprecated_call(
match="Environment variable `PL_TORCH_DISTRIBUTED_BACKEND` was deprecated in v1.6"
):
assert strategy._get_process_group_backend() == expected_process_group_backend
else:
assert strategy._get_process_group_backend() == expected_process_group_backend
assert strategy._get_process_group_backend() == expected_process_group_backend


@pytest.mark.parametrize(
Expand Down