Skip to content

Commit 788d9dd

Browse files
tchatonBorda
authored andcommitted
[App] Enable uploading files to a project. (#16631)
* update * update * update * update * update * update * update * update * update * update * update * update * update * update --------- Co-authored-by: thomas <[email protected]> (cherry picked from commit 55a0caa)
1 parent ba0fd98 commit 788d9dd

File tree

9 files changed

+152
-75
lines changed

9 files changed

+152
-75
lines changed

requirements/app/base.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
lightning-cloud>=0.5.20
1+
lightning-cloud>=0.5.22
22
packaging
33
typing-extensions>=4.0.0, <=4.4.0
44
deepdiff>=5.7.0, <6.2.4

src/lightning_app/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
1313

1414
- Added FileSystem abstraction to simply manipulation of files ([#16581](https://github.com/Lightning-AI/lightning/pull/16581))
1515

16+
- Enabled `cp` (upload) at project level ([#16631](https://github.com/Lightning-AI/lightning/pull/16631))
1617
- Enabled `ls` and `cp` (download) at project level ([#16622](https://github.com/Lightning-AI/lightning/pull/16622))
1718
- Added Storage Commands ([#16606](https://github.com/Lightning-AI/lightning/pull/16606))
1819
* `ls`: List files from your Cloud Platform Filesystem
@@ -41,6 +42,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
4142

4243
- Fixed the Drive root_folder not parsed properly ([#16454](https://github.com/Lightning-AI/lightning/pull/16454))
4344

45+
- Fixed malformed path when downloading files using `lightning cp` ([#16626](https://github.com/Lightning-AI/lightning/pull/16626))
4446

4547
## [1.9.0] - 2023-01-17
4648

src/lightning_app/cli/commands/cp.py

Lines changed: 53 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@
2222

2323
import click
2424
import requests
25-
import rich
2625
import urllib3
27-
from lightning_cloud.openapi import Externalv1LightningappInstance, IdArtifactsBody, V1CloudSpace
26+
from lightning_cloud.openapi import Externalv1LightningappInstance, ProjectIdStorageBody, V1CloudSpace
2827
from rich.live import Live
2928
from rich.progress import BarColumn, DownloadColumn, Progress, Task, TextColumn
3029
from rich.spinner import Spinner
@@ -34,6 +33,7 @@
3433
from lightning_app.cli.commands.pwd import _pwd
3534
from lightning_app.source_code import FileUploader
3635
from lightning_app.utilities.app_helpers import Logger
36+
from lightning_app.utilities.cli_helpers import _error_and_exit
3737
from lightning_app.utilities.network import LightningClient
3838

3939
logger = Logger(__name__)
@@ -55,7 +55,7 @@ def cp(src_path: str, dst_path: str, r: bool = False, recursive: bool = False) -
5555
pwd = _pwd()
5656

5757
if pwd == "/" or len(pwd.split("/")) == 1:
58-
return _error_and_exit("Uploading files at the project level isn't supported yet.")
58+
return _error_and_exit("Uploading files at the project level isn't allowed yet.")
5959

6060
client = LightningClient()
6161

@@ -74,10 +74,18 @@ def cp(src_path: str, dst_path: str, r: bool = False, recursive: bool = False) -
7474

7575

7676
def _upload_files(live, client: LightningClient, local_src: str, remote_dst: str, pwd: str) -> str:
77+
remote_splits = [split for split in remote_dst.split("/") if split != ""]
78+
remote_dst = os.path.join(*remote_splits)
79+
7780
if not os.path.exists(local_src):
7881
return _error_and_exit(f"The provided source path {local_src} doesn't exist.")
7982

80-
project_id, app_id = _get_project_app_ids(pwd)
83+
lit_resource = None
84+
85+
if len(remote_splits) > 1:
86+
project_id, lit_resource = _get_project_id_and_resource(pwd)
87+
else:
88+
project_id = _get_project_id_from_name(remote_dst)
8189

8290
local_src = Path(local_src).resolve()
8391
upload_paths = []
@@ -91,21 +99,34 @@ def _upload_files(live, client: LightningClient, local_src: str, remote_dst: str
9199

92100
upload_urls = []
93101

102+
clusters = client.projects_service_list_project_cluster_bindings(project_id)
103+
94104
for upload_path in upload_paths:
95-
filename = str(upload_path).replace(str(os.getcwd()), "")[1:]
96-
response = client.lightningapp_instance_service_upload_lightningapp_instance_artifact(
97-
project_id=project_id,
98-
id=app_id,
99-
body=IdArtifactsBody(filename),
100-
async_req=True,
101-
)
102-
upload_urls.append(response)
105+
for cluster in clusters.clusters:
106+
filename = str(upload_path).replace(str(os.getcwd()), "")[1:]
107+
if lit_resource:
108+
filename = _get_prefix(os.path.join(remote_dst, filename), lit_resource)
109+
else:
110+
filename = "/" + filename
111+
112+
response = client.lightningapp_instance_service_upload_project_artifact(
113+
project_id=project_id,
114+
body=ProjectIdStorageBody(cluster_id=cluster.cluster_id, filename=filename),
115+
async_req=True,
116+
)
117+
upload_urls.append(response)
118+
119+
upload_urls = [upload_url.get().upload_url for upload_url in upload_urls]
103120

104121
live.stop()
105122

123+
if not upload_paths:
124+
print("There were no files to upload.")
125+
return
126+
106127
progress = _get_progress_bar()
107128

108-
total_size = sum([Path(path).stat().st_size for path in upload_paths])
129+
total_size = sum([Path(path).stat().st_size for path in upload_paths]) // max(len(clusters.clusters), 1)
109130
task_id = progress.add_task("upload", filename="", total=total_size)
110131

111132
progress.start()
@@ -126,7 +147,7 @@ def _upload_files(live, client: LightningClient, local_src: str, remote_dst: str
126147
def _upload(source_file: str, presigned_url: ApplyResult, progress: Progress, task_id: Task) -> Optional[Exception]:
127148
source_file = Path(source_file)
128149
file_uploader = FileUploader(
129-
presigned_url.get().upload_url,
150+
presigned_url,
130151
source_file,
131152
total_size=None,
132153
name=str(source_file),
@@ -143,13 +164,13 @@ def _download_files(live, client, remote_src: str, local_dst: str, pwd: str):
143164
download_urls = []
144165
total_size = []
145166

146-
prefix = _get_prefix("/".join(pwd.split("/")[3:]), lit_resource)
167+
prefix = _get_prefix("/".join(pwd.split("/")[3:]), lit_resource) + "/"
147168

148169
for artifact in _collect_artifacts(client, project_id, prefix, include_download_url=True):
149170
path = os.path.join(local_dst, artifact.filename.replace(remote_src, ""))
150171
path = Path(path).resolve()
151172
os.makedirs(path.parent, exist_ok=True)
152-
download_paths.append(Path(path).resolve())
173+
download_paths.append(path)
153174
download_urls.append(artifact.url)
154175
total_size.append(int(artifact.size_bytes))
155176

@@ -182,14 +203,17 @@ def _download_file(path: str, url: str, progress: Progress, task_id: Task) -> No
182203
# Disable warning about making an insecure request
183204
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
184205

185-
request = requests.get(url, stream=True, verify=False)
206+
try:
207+
request = requests.get(url, stream=True, verify=False)
186208

187-
chunk_size = 1024
209+
chunk_size = 1024
188210

189-
with open(path, "wb") as fp:
190-
for chunk in request.iter_content(chunk_size=chunk_size):
191-
fp.write(chunk) # type: ignore
192-
progress.update(task_id, advance=len(chunk))
211+
with open(path, "wb") as fp:
212+
for chunk in request.iter_content(chunk_size=chunk_size):
213+
fp.write(chunk) # type: ignore
214+
progress.update(task_id, advance=len(chunk))
215+
except ConnectionError:
216+
pass
193217

194218

195219
def _sanitize_path(path: str, pwd: str) -> Tuple[str, bool]:
@@ -211,29 +235,6 @@ def _remove_remote(path: str) -> str:
211235
return path.replace("r:", "").replace("remote:", "")
212236

213237

214-
def _error_and_exit(msg: str) -> str:
215-
rich.print(f"[red]ERROR[/red]: {msg}")
216-
sys.exit(0)
217-
218-
219-
# TODO: To be removed when upload is supported for CloudSpaces.
220-
def _get_project_app_ids(pwd: str) -> Tuple[str, str]:
221-
"""Convert a root path to a project id and app id."""
222-
# TODO: Handle project level
223-
project_name, app_name, *_ = pwd.split("/")[1:3]
224-
client = LightningClient()
225-
projects = client.projects_service_list_memberships()
226-
project_id = [project.project_id for project in projects.memberships if project.name == project_name][0]
227-
client = LightningClient()
228-
lit_apps = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project_id).lightningapps
229-
lit_apps = [lit_app for lit_app in lit_apps if lit_app.name == app_name]
230-
if len(lit_apps) != 1:
231-
print(f"ERROR: There isn't any Lightning App matching the name {app_name}.")
232-
sys.exit(0)
233-
lit_app = lit_apps[0]
234-
return project_id, lit_app.id
235-
236-
237238
def _get_project_id_and_resource(pwd: str) -> Tuple[str, Union[Externalv1LightningappInstance, V1CloudSpace]]:
238239
"""Convert a root path to a project id and app id."""
239240
# TODO: Handle project level
@@ -263,6 +264,13 @@ def _get_project_id_and_resource(pwd: str) -> Tuple[str, Union[Externalv1Lightni
263264
return project_id, lit_ressources[0]
264265

265266

267+
def _get_project_id_from_name(project_name: str) -> str:
268+
# 1. Collect the projects of the user
269+
client = LightningClient()
270+
projects = client.projects_service_list_memberships()
271+
return [project.project_id for project in projects.memberships if project.name == project_name][0]
272+
273+
266274
def _get_progress_bar():
267275
return Progress(
268276
TextColumn("[bold blue]{task.description}", justify="left"),

src/lightning_app/cli/commands/ls.py

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import click
2020
import rich
21+
from fastapi import HTTPException
2122
from lightning_cloud.openapi import Externalv1LightningappInstance
2223
from rich.console import Console
2324
from rich.live import Live
@@ -27,6 +28,7 @@
2728
from lightning_app.cli.commands.cd import _CD_FILE
2829
from lightning_app.cli.commands.connection import _LIGHTNING_CONNECTION_FOLDER
2930
from lightning_app.utilities.app_helpers import Logger
31+
from lightning_app.utilities.cli_helpers import _error_and_exit
3032
from lightning_app.utilities.network import LightningClient
3133

3234
_FOLDER_COLOR = "sky_blue1"
@@ -71,7 +73,15 @@ def ls(path: Optional[str] = None) -> List[str]:
7173
# TODO: Add support for CloudSpaces, etc..
7274
splits = root.split("/")[1:]
7375

74-
project_id = [project.project_id for project in projects.memberships if project.name == splits[0]][0]
76+
project = [project for project in projects.memberships if project.name == splits[0]]
77+
78+
# This happens if the user changes cluster and the project doesn't exit.
79+
if len(project) == 0:
80+
return _error_and_exit(
81+
f"There isn't any Lightning Project matching the name {splits[0]}." " HINT: Use `lightning cd`."
82+
)
83+
84+
project_id = project[0].project_id
7585

7686
lit_apps = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project_id).lightningapps
7787

@@ -91,9 +101,7 @@ def ls(path: Optional[str] = None) -> List[str]:
91101
lit_ressources = [lit_resource for lit_resource in lit_apps if lit_resource.name == splits[1]]
92102

93103
if len(lit_ressources) == 0:
94-
95-
print(f"ERROR: There isn't any Lightning Ressource matching the name {splits[1]}.")
96-
sys.exit(0)
104+
_error_and_exit(f"There isn't any Lightning Ressource matching the name {splits[1]}.")
97105

98106
lit_resource = lit_ressources[0]
99107

@@ -212,32 +220,44 @@ def _collect_artifacts(
212220
if page_token in tokens:
213221
return
214222

215-
response = client.lightningapp_instance_service_list_project_artifacts(
216-
project_id,
217-
prefix=prefix,
218-
cluster_id=cluster_id,
219-
page_token=page_token,
220-
include_download_url=include_download_url,
221-
page_size=str(page_size),
222-
)
223-
yield from response.artifacts
224-
225-
if response.next_page_token != "":
226-
tokens.append(page_token)
227-
yield from _collect_artifacts(
228-
client,
223+
try:
224+
response = client.lightningapp_instance_service_list_project_artifacts(
229225
project_id,
230226
prefix=prefix,
231227
cluster_id=cluster_id,
232-
page_token=response.next_page_token,
233-
tokens=tokens,
228+
page_token=page_token,
229+
include_download_url=include_download_url,
230+
page_size=str(page_size),
234231
)
232+
if response:
233+
for artifact in response.artifacts:
234+
if ".lightning-app-sync" in artifact.filename:
235+
continue
236+
yield artifact
237+
238+
if response.next_page_token != "":
239+
tokens.append(page_token)
240+
yield from _collect_artifacts(
241+
client,
242+
project_id,
243+
prefix=prefix,
244+
cluster_id=cluster_id,
245+
page_token=response.next_page_token,
246+
tokens=tokens,
247+
)
248+
except HTTPException:
249+
# Note: This is triggered when the request is wrong.
250+
# This is currently happening due to looping through the user clusters.
251+
pass
235252

236253

237254
def _add_resource_prefix(prefix: str, resource_path: str):
238255
if resource_path in prefix:
239256
return prefix
240-
return "/" + os.path.join(resource_path, prefix)
257+
prefix = os.path.join(resource_path, prefix)
258+
if not prefix.startswith("/"):
259+
prefix = "/" + prefix
260+
return prefix
241261

242262

243263
def _get_prefix(prefix: str, lit_resource) -> str:

src/lightning_app/storage/filesystem.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,13 @@ def put(self, src_path: str, dst_path: str, put_fn: Callable = _copy_files) -> N
5050
if not dst_path.startswith("/"):
5151
raise Exception(f"The provided destination {dst_path} needs to start with `/`.")
5252

53+
if dst_path == "/":
54+
dst_path = os.path.join(self._root, os.path.basename(src_path))
55+
else:
56+
dst_path = os.path.join(self._root, dst_path[1:])
57+
5358
src = Path(src_path).resolve()
54-
dst = Path(os.path.join(self._root, dst_path[1:])).resolve()
59+
dst = Path(dst_path).resolve()
5560

5661
return put_fn(src, dst, fs=self._fs)
5762

src/lightning_app/utilities/cli_helpers.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import click
2626
import packaging
2727
import requests
28+
import rich
2829

2930
from lightning_app import __package_name__, __version__
3031
from lightning_app.core.constants import APP_SERVER_PORT
@@ -349,3 +350,8 @@ def _check_environment_and_redirect():
349350

350351
_redirect_command(env_executable)
351352
return
353+
354+
355+
def _error_and_exit(msg: str) -> str:
356+
rich.print(f"[red]ERROR[/red]: {msg}")
357+
sys.exit(0)

src/lightning_app/utilities/network.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import ast
1516
import socket
1617
import time
1718
from functools import wraps
@@ -21,6 +22,7 @@
2122
import lightning_cloud
2223
import requests
2324
import urllib3
25+
from fastapi import HTTPException
2426
from lightning_cloud.rest_client import create_swagger_client, GridRestClient
2527
from requests import Session
2628
from requests.adapters import HTTPAdapter
@@ -137,6 +139,8 @@ def wrapped(*args: Any, **kwargs: Any) -> Any:
137139
try:
138140
return func(self, *args, **kwargs)
139141
except lightning_cloud.openapi.rest.ApiException as e:
142+
if e.status == 500:
143+
raise HTTPException(status_code=500, detail=ast.literal_eval(e.body.decode("utf-8"))["message"])
140144
# retry if the control plane fails with all errors except 4xx but not 408 - (Request Timeout)
141145
if e.status == 408 or e.status == 409 or not str(e.status).startswith("4"):
142146
consecutive_errors += 1

0 commit comments

Comments
 (0)