Skip to content

Commit c41c88e

Browse files
tchatonlantiga
authored andcommitted
Add FileSystem (#16581)
Co-authored-by: Luca Antiga <[email protected]> Co-authored-by: thomas <[email protected]> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> (cherry picked from commit 695fb6a)
1 parent d881cc9 commit c41c88e

File tree

6 files changed

+228
-2
lines changed

6 files changed

+228
-2
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ module = [
151151
"lightning_app.source_code.tar",
152152
"lightning_app.source_code.uploader",
153153
"lightning_app.storage.copier",
154+
"lightning_app.storage.filesystem",
154155
"lightning_app.storage.drive",
155156
"lightning_app.storage.orchestrator",
156157
"lightning_app.storage.path",

src/lightning_app/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
1111

1212
- Added experimental support for interruptible GPU in the cloud ([#16399](https://github.com/Lightning-AI/lightning/pull/16399))
1313

14+
- Added FileSystem abstraction to simply manipulation of files ([#16581](https://github.com/Lightning-AI/lightning/pull/16581))
1415

1516
### Changed
1617

src/lightning_app/storage/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from lightning_app.storage.drive import Drive # noqa: F401
2+
from lightning_app.storage.filesystem import FileSystem # noqa: F401
23
from lightning_app.storage.mount import Mount # noqa: F401
34
from lightning_app.storage.orchestrator import StorageOrchestrator # noqa: F401
45
from lightning_app.storage.path import Path # noqa: F401

src/lightning_app/storage/copier.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from time import time
2020
from typing import Optional, TYPE_CHECKING, Union
2121

22+
from fsspec import AbstractFileSystem
2223
from fsspec.implementations.local import LocalFileSystem
2324

2425
from lightning_app.core.queues import BaseQueue
@@ -103,15 +104,20 @@ def _find_matching_path(work, request: _GetRequest) -> Optional["lightning_app.s
103104
return candidate
104105

105106

106-
def _copy_files(source_path: pathlib.Path, destination_path: pathlib.Path) -> None:
107+
def _copy_files(
108+
source_path: pathlib.Path,
109+
destination_path: pathlib.Path,
110+
fs: Optional[AbstractFileSystem] = None,
111+
) -> None:
107112
"""Copy files from one path to another.
108113
109114
The source path must either be an existing file or folder. If the source is a folder, the destination path is
110115
interpreted as a folder as well. If the source is a file, the destination path is interpreted as a file too.
111116
112117
Files in a folder are copied recursively and efficiently using multiple threads.
113118
"""
114-
fs = _filesystem()
119+
if fs is None:
120+
fs = _filesystem()
115121

116122
def _copy(from_path: pathlib.Path, to_path: pathlib.Path) -> Optional[Exception]:
117123
_logger.debug(f"Copying {str(from_path)} -> {str(to_path)}")
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import os
2+
import shutil
3+
from pathlib import Path
4+
from typing import Callable, List
5+
6+
from fsspec.implementations.local import LocalFileSystem
7+
8+
from lightning_app.storage.copier import _copy_files
9+
from lightning_app.storage.path import _filesystem, _shared_storage_path
10+
11+
12+
def _get_files(fs, src: Path, dst: Path, overwrite: bool = True):
13+
dst = dst.resolve()
14+
if fs.isdir(src):
15+
if isinstance(fs, LocalFileSystem):
16+
dst = dst.resolve()
17+
if fs.exists(dst):
18+
if overwrite:
19+
fs.rm(str(dst), recursive=True)
20+
else:
21+
raise FileExistsError(f"The file {dst} was found. Add get(..., overwrite=True) to replace it.")
22+
23+
shutil.copytree(src, dst)
24+
else:
25+
glob = f"{str(src)}/**"
26+
fs.get(glob, str(dst), recursive=False)
27+
else:
28+
fs.get(str(src), str(dst), recursive=False)
29+
30+
31+
class FileSystem:
32+
33+
"""This filesystem enables to easily move files from and to the shared storage."""
34+
35+
def __init__(self) -> None:
36+
self._fs = _filesystem()
37+
self._root = str(_shared_storage_path())
38+
39+
def put(self, src_path: str, dst_path: str, put_fn: Callable = _copy_files) -> None:
40+
"""This method enables to put a file to the shared storage in a blocking fashion.
41+
42+
Arguments:
43+
src_path: The path to your files locally
44+
dst_path: The path to your files transfered in the shared storage.
45+
put_fn: The method to use to put files in the shared storage.
46+
"""
47+
if not os.path.exists(Path(src_path).resolve()):
48+
raise FileExistsError(f"The provided path {src_path} doesn't exist")
49+
50+
if not dst_path.startswith("/"):
51+
raise Exception(f"The provided destination {dst_path} needs to start with `/`.")
52+
53+
src = Path(src_path).resolve()
54+
dst = Path(os.path.join(self._root, dst_path[1:])).resolve()
55+
56+
return put_fn(src, dst, fs=self._fs)
57+
58+
def get(self, src_path: str, dst_path: str, overwrite: bool = True, get_fn: Callable = _get_files) -> None:
59+
"""This method enables to get files from the shared storage in a blocking fashion.
60+
61+
Arguments:
62+
src_path: The path to your files in the shared storage
63+
dst_path: The path to your files transfered locally
64+
get_fn: The method to use to put files in the shared storage.
65+
"""
66+
if not src_path.startswith("/"):
67+
raise Exception(f"The provided destination {src_path} needs to start with `/`.")
68+
69+
src = Path(os.path.join(self._root, src_path[1:])).resolve()
70+
dst = Path(dst_path).resolve()
71+
72+
return get_fn(fs=self._fs, src=src, dst=dst, overwrite=overwrite)
73+
74+
def listdir(self, path: str) -> List[str]:
75+
"""This method enables to list files from the shared storage in a blocking fashion.
76+
77+
Arguments:
78+
path: The path to files to list.
79+
"""
80+
if not path.startswith("/"):
81+
raise Exception(f"The provided destination {path} needs to start with `/`.")
82+
83+
shared_path = Path(os.path.join(self._root, path[1:])).resolve()
84+
85+
if not self._fs.exists(shared_path):
86+
raise RuntimeError(f"The provided path {shared_path} doesn't exist.")
87+
88+
# Invalidate cache before running ls in case new directories have been added
89+
# TODO: Re-evaluate this - may lead to performance issues
90+
self._fs.invalidate_cache()
91+
92+
paths = self._fs.ls(shared_path)
93+
if not paths:
94+
return paths
95+
96+
return sorted([path.replace(self._root + os.sep, "") for path in paths if not path.endswith("info.txt")])
97+
98+
def walk(self, path: str) -> List[str]:
99+
"""This method enables to list files from the shared storage in a blocking fashion.
100+
101+
Arguments:
102+
path: The path to files to list.
103+
"""
104+
if not path.startswith("/"):
105+
raise Exception(f"The provided destination {path} needs to start with `/`.")
106+
107+
shared_path = Path(os.path.join(self._root, path[1:])).resolve()
108+
109+
if not self._fs.exists(shared_path):
110+
raise RuntimeError(f"The provided path {shared_path} doesn't exist.")
111+
112+
# Invalidate cache before running ls in case new directories have been added
113+
# TODO: Re-evaluate this - may lead to performance issues
114+
self._fs.invalidate_cache()
115+
116+
paths = self._fs.ls(shared_path)
117+
if not paths:
118+
return paths
119+
120+
out = []
121+
122+
for shared_path in paths:
123+
path = str(shared_path).replace(self._root, "")
124+
if self._fs.isdir(shared_path):
125+
out.extend(self.walk(path))
126+
else:
127+
if path.endswith("info.txt"):
128+
continue
129+
out.append(path[1:])
130+
return sorted(out)
131+
132+
def rm(self, path) -> None:
133+
if not path.startswith("/"):
134+
raise Exception(f"The provided destination {path} needs to start with `/`.")
135+
136+
delete_path = Path(os.path.join(self._root, path[1:])).resolve()
137+
138+
if self._fs.exists(str(delete_path)):
139+
if self._fs.isdir(str(delete_path)):
140+
self._fs.rmdir(str(delete_path))
141+
else:
142+
self._fs.rm(str(delete_path))
143+
else:
144+
raise Exception(f"The file path {path} doesn't exist.")
145+
146+
def isfile(self, path: str) -> bool:
147+
if not path.startswith("/"):
148+
raise Exception(f"The provided destination {path} needs to start with `/`.")
149+
150+
path = Path(os.path.join(self._root, path[1:])).resolve()
151+
return self._fs.isfile(path)
152+
153+
def isdir(self, path: str) -> bool:
154+
if not path.startswith("/"):
155+
raise Exception(f"The provided destination {path} needs to start with `/`.")
156+
157+
path = Path(os.path.join(self._root, path[1:])).resolve()
158+
return self._fs.isdir(path)
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import os
2+
import sys
3+
4+
import pytest
5+
6+
from lightning_app.storage import FileSystem
7+
8+
9+
@pytest.mark.skipif(sys.platform == "win32", reason="TODO: Add support for windows")
10+
def test_filesystem(tmpdir):
11+
fs = FileSystem()
12+
13+
with open(f"{tmpdir}/a.txt", "w") as f:
14+
f.write("example")
15+
16+
os.makedirs(f"{tmpdir}/checkpoints", exist_ok=True)
17+
with open(f"{tmpdir}/checkpoints/a.txt", "w") as f:
18+
f.write("example")
19+
20+
with open(f"{tmpdir}/info.txt", "w") as f:
21+
f.write("example")
22+
23+
assert fs.listdir("/") == []
24+
fs.put(f"{tmpdir}/a.txt", "/a.txt")
25+
fs.put(f"{tmpdir}/info.txt", "/info.txt")
26+
assert fs.listdir("/") == ["a.txt"]
27+
28+
assert fs.isfile("/a.txt")
29+
30+
fs.put(f"{tmpdir}/checkpoints", "/checkpoints")
31+
assert not fs.isfile("/checkpoints")
32+
assert fs.isdir("/checkpoints")
33+
assert fs.isfile("/checkpoints/a.txt")
34+
35+
assert fs.listdir("/") == ["a.txt", "checkpoints"]
36+
assert fs.walk("/") == ["a.txt", "checkpoints/a.txt"]
37+
38+
os.remove(f"{tmpdir}/a.txt")
39+
40+
assert not os.path.exists(f"{tmpdir}/a.txt")
41+
42+
fs.get("/a.txt", f"{tmpdir}/a.txt")
43+
44+
assert os.path.exists(f"{tmpdir}/a.txt")
45+
46+
fs.rm("/a.txt")
47+
48+
assert fs.listdir("/") == ["checkpoints"]
49+
fs.rm("/checkpoints/a.txt")
50+
assert fs.listdir("/") == ["checkpoints"]
51+
assert fs.walk("/checkpoints") == []
52+
fs.rm("/checkpoints/")
53+
assert fs.listdir("/") == []
54+
55+
with pytest.raises(FileExistsError, match="HERE"):
56+
fs.put("HERE", "/HERE")
57+
58+
with pytest.raises(RuntimeError, match="The provided path"):
59+
fs.listdir("/space")

0 commit comments

Comments
 (0)