Skip to content

Commit e0f84be

Browse files
ethanwharrislantiga
authored andcommitted
[App] Add lightning open command (#16482)
(cherry picked from commit 4a802e0)
1 parent fc8c48f commit e0f84be

File tree

4 files changed

+468
-106
lines changed

4 files changed

+468
-106
lines changed

src/lightning_app/cli/lightning_cli.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from lightning_app.cli.lightning_cli_delete import delete
3131
from lightning_app.cli.lightning_cli_list import get_list
3232
from lightning_app.core.constants import DEBUG, ENABLE_APP_COMMENT_COMMAND_EXECUTION, get_lightning_cloud_url
33+
from lightning_app.runners.cloud import CloudRuntime
3334
from lightning_app.runners.runtime import dispatch
3435
from lightning_app.runners.runtime_type import RuntimeType
3536
from lightning_app.utilities.app_commands import run_app_commands
@@ -380,6 +381,27 @@ def run_app(
380381

381382
run.add_command(_run_model)
382383

384+
385+
@_main.command("open", hidden=True)
386+
@click.argument("path", type=str, default=".")
387+
@click.option(
388+
"--cluster-id",
389+
type=str,
390+
default=None,
391+
help="Open on a specific Lightning AI BYOC compute cluster",
392+
)
393+
@click.option("--name", help="The name to use for the CloudSpace", default="", type=str)
394+
def open(path: str, cluster_id: str, name: str) -> None:
395+
"""Open files or folders from your machine in a Lightning CloudSpace."""
396+
397+
if not os.path.exists(path):
398+
click.echo(f"The provided path `{path}` doesn't exist.")
399+
sys.exit(1)
400+
401+
runtime = CloudRuntime(entrypoint=Path(path))
402+
runtime.open(name, cluster_id)
403+
404+
383405
_main.add_command(get_list)
384406
_main.add_command(delete)
385407
_main.add_command(create)

src/lightning_app/runners/cloud.py

Lines changed: 156 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from typing import Any, Dict, List, Optional, Tuple, Union
1313

1414
import click
15+
import rich
1516
from lightning_cloud.openapi import (
1617
Body3,
1718
Body4,
@@ -67,7 +68,7 @@
6768
from lightning_app.runners.backends.cloud import CloudBackend
6869
from lightning_app.runners.runtime import Runtime
6970
from lightning_app.source_code import LocalSourceCodeDir
70-
from lightning_app.source_code.copytree import _filter_ignored, _parse_lightningignore
71+
from lightning_app.source_code.copytree import _filter_ignored, _IGNORE_FUNCTION, _parse_lightningignore
7172
from lightning_app.storage import Drive, Mount
7273
from lightning_app.utilities.app_helpers import _is_headless, Logger
7374
from lightning_app.utilities.auth import _credential_string_to_basic_auth_params
@@ -106,6 +107,90 @@ def _to_clean_dict(swagger_object, map_attributes):
106107
class CloudRuntime(Runtime):
107108
backend: Union[str, CloudBackend] = "cloud"
108109

110+
def open(self, name: str, cluster_id: Optional[str] = None):
111+
"""Method to open a CloudSpace with the root folder uploaded."""
112+
try:
113+
# Check for feature support
114+
user = self.backend.client.auth_service_get_user()
115+
if not user.features.code_tab:
116+
rich.print(
117+
"[red]The `lightning open` command has not been enabled for your account. "
118+
"To request access, please contact [email protected][/red]"
119+
)
120+
sys.exit(1)
121+
122+
# Dispatch in four phases: resolution, validation, spec creation, API transactions
123+
# Resolution
124+
cloudspace_config = self._resolve_config(name, load=False)
125+
root = self._resolve_root()
126+
ignore_functions = self._resolve_open_ignore_functions()
127+
repo = self._resolve_repo(root, ignore_functions)
128+
project = self._resolve_project()
129+
existing_cloudspaces = self._resolve_existing_cloudspaces(project, cloudspace_config.name)
130+
cluster_id = self._resolve_cluster_id(cluster_id, project.project_id, existing_cloudspaces)
131+
existing_cloudspace, existing_run_instance = self._resolve_existing_run_instance(
132+
cluster_id, project.project_id, existing_cloudspaces
133+
)
134+
cloudspace_name = self._resolve_cloudspace_name(
135+
cloudspace_config.name,
136+
existing_cloudspace,
137+
existing_cloudspaces,
138+
)
139+
needs_credits = self._resolve_needs_credits(project)
140+
141+
# Validation
142+
# Note: We do not validate the repo here since open only uploads a directory if asked explicitly
143+
self._validate_cluster_id(cluster_id, project.project_id)
144+
145+
# Spec creation
146+
run_body = self._get_run_body(cluster_id, [], None, [], True, root, self.start_server)
147+
148+
if existing_run_instance is not None:
149+
print(
150+
f"Re-opening the CloudSpace {cloudspace_config.name}. "
151+
"This operation will create a new run but will not overwrite the files in your CloudSpace."
152+
)
153+
else:
154+
print(f"The name of the CloudSpace is: {cloudspace_config.name}")
155+
156+
# API transactions
157+
cloudspace_id = self._api_create_cloudspace_if_not_exists(
158+
project.project_id,
159+
cloudspace_name,
160+
existing_cloudspace,
161+
)
162+
self._api_stop_existing_run_instance(project.project_id, existing_run_instance)
163+
run = self._api_create_run(project.project_id, cloudspace_id, run_body)
164+
self._api_package_and_upload_repo(repo, run)
165+
166+
if getattr(run, "cluster_id", None):
167+
print(f"Running on {run.cluster_id}")
168+
169+
# TODO: We shouldn't need to create an instance here
170+
if existing_run_instance is not None:
171+
run_instance = self._api_transfer_run_instance(
172+
project.project_id,
173+
run.id,
174+
existing_run_instance.id,
175+
V1LightningappInstanceState.STOPPED,
176+
)
177+
else:
178+
run_instance = self._api_create_run_instance(
179+
cluster_id,
180+
project.project_id,
181+
cloudspace_name,
182+
cloudspace_id,
183+
run.id,
184+
V1LightningappInstanceState.STOPPED,
185+
)
186+
187+
if "PYTEST_CURRENT_TEST" not in os.environ:
188+
click.launch(self._get_app_url(run_instance, "code", needs_credits))
189+
190+
except ApiException as e:
191+
logger.error(e.body)
192+
sys.exit(1)
193+
109194
def dispatch(
110195
self,
111196
name: str = "",
@@ -116,10 +201,10 @@ def dispatch(
116201
) -> None:
117202
"""Method to dispatch and run the :class:`~lightning_app.core.app.LightningApp` in the cloud."""
118203
# not user facing error ideally - this should never happen in normal user workflow
119-
if not self.entrypoint_file:
204+
if not self.entrypoint:
120205
raise ValueError(
121206
"Entrypoint file not provided. Did you forget to "
122-
"initialize the Runtime object with `entrypoint_file` argument?"
207+
"initialize the Runtime object with `entrypoint` argument?"
123208
)
124209

125210
cleanup_handle = None
@@ -213,20 +298,20 @@ def dispatch(
213298
env_vars,
214299
auth,
215300
)
301+
302+
if run_instance.status.phase == V1LightningappInstanceState.FAILED:
303+
raise RuntimeError("Failed to create the application. Cannot upload the source code.")
304+
305+
# TODO: Remove testing dependency, but this would open a tab for each test...
306+
if open_ui and "PYTEST_CURRENT_TEST" not in os.environ:
307+
click.launch(self._get_app_url(run_instance, "logs" if run.is_headless else "web-ui", needs_credits))
216308
except ApiException as e:
217309
logger.error(e.body)
218310
sys.exit(1)
219311
finally:
220312
if cleanup_handle:
221313
cleanup_handle()
222314

223-
if run_instance.status.phase == V1LightningappInstanceState.FAILED:
224-
raise RuntimeError("Failed to create the application. Cannot upload the source code.")
225-
226-
# TODO: Remove testing dependency, but this would open a tab for each test...
227-
if open_ui and "PYTEST_CURRENT_TEST" not in os.environ:
228-
click.launch(self._get_app_url(run_instance, needs_credits))
229-
230315
@classmethod
231316
def load_app_from_file(cls, filepath: str) -> "LightningApp":
232317
"""Load a LightningApp from a file, mocking the imports."""
@@ -248,36 +333,55 @@ def load_app_from_file(cls, filepath: str) -> "LightningApp":
248333
del os.environ["LAI_RUNNING_IN_CLOUD"]
249334
return app
250335

251-
def _resolve_config(self, name: Optional[str]) -> AppConfig:
336+
def _resolve_config(self, name: Optional[str], load: bool = True) -> AppConfig:
252337
"""Find and load the config file if it exists (otherwise create an empty config).
253338
254339
Override the name if provided.
255340
"""
256-
config_file = _get_config_file(self.entrypoint_file)
257-
cloudspace_config = AppConfig.load_from_file(config_file) if config_file.exists() else AppConfig()
341+
config_file = _get_config_file(self.entrypoint)
342+
cloudspace_config = AppConfig.load_from_file(config_file) if config_file.exists() and load else AppConfig()
258343
if name:
259344
# Override the name if provided
260345
cloudspace_config.name = name
261346
return cloudspace_config
262347

263348
def _resolve_root(self) -> Path:
264349
"""Determine the root of the project."""
265-
return Path(self.entrypoint_file).absolute().parent
350+
root = Path(self.entrypoint).absolute()
351+
if root.is_file():
352+
root = root.parent
353+
return root
354+
355+
def _resolve_open_ignore_functions(self) -> List[_IGNORE_FUNCTION]:
356+
"""Used by the ``open`` method.
266357
267-
def _resolve_repo(self, root: Path) -> LocalSourceCodeDir:
358+
If the entrypoint is a file, return an ignore function that will ignore everything except that file so only the
359+
file gets uploaded.
360+
"""
361+
entrypoint = self.entrypoint.absolute()
362+
if entrypoint.is_file():
363+
return [lambda src, paths: [path for path in paths if path.absolute() == entrypoint]]
364+
return []
365+
366+
def _resolve_repo(
367+
self,
368+
root: Path,
369+
ignore_functions: Optional[List[_IGNORE_FUNCTION]] = None,
370+
) -> LocalSourceCodeDir:
268371
"""Gather and merge all lightningignores from the app children and create the ``LocalSourceCodeDir``
269372
object."""
270-
271-
flow_lightningignores = [flow.lightningignore for flow in self.app.flows]
272-
work_lightningignores = [work.lightningignore for work in self.app.works]
273-
lightningignores = flow_lightningignores + work_lightningignores
274-
if lightningignores:
275-
merged = sum(lightningignores, tuple())
276-
logger.debug(f"Found the following lightningignores: {merged}")
277-
patterns = _parse_lightningignore(merged)
278-
ignore_functions = [partial(_filter_ignored, root, patterns)]
279-
else:
280-
ignore_functions = None
373+
if ignore_functions is None:
374+
ignore_functions = []
375+
376+
if self.app is not None:
377+
flow_lightningignores = [flow.lightningignore for flow in self.app.flows]
378+
work_lightningignores = [work.lightningignore for work in self.app.works]
379+
lightningignores = flow_lightningignores + work_lightningignores
380+
if lightningignores:
381+
merged = sum(lightningignores, tuple())
382+
logger.debug(f"Found the following lightningignores: {merged}")
383+
patterns = _parse_lightningignore(merged)
384+
ignore_functions = [*ignore_functions, partial(_filter_ignored, root, patterns)]
281385

282386
return LocalSourceCodeDir(path=root, ignore_functions=ignore_functions)
283387

@@ -562,7 +666,7 @@ def _get_run_body(
562666
self,
563667
cluster_id: str,
564668
flow_servers: List[V1Flowserver],
565-
network_configs: List[V1NetworkConfig],
669+
network_configs: Optional[List[V1NetworkConfig]],
566670
works: List[V1Work],
567671
no_cache: bool,
568672
root: Path,
@@ -571,24 +675,28 @@ def _get_run_body(
571675
"""Get the specification of the run creation request."""
572676
# The entry point file needs to be relative to the root of the uploaded source file directory,
573677
# because the backend will invoke the lightning commands relative said source directory
574-
app_entrypoint_file = Path(self.entrypoint_file).absolute().relative_to(root)
678+
# TODO: we shouldn't set this if the entrypoint isn't a file but the backend gives an error if we don't
679+
app_entrypoint_file = Path(self.entrypoint).absolute().relative_to(root)
575680

576681
run_body = CloudspaceIdRunsBody(
577682
cluster_id=cluster_id,
578683
app_entrypoint_file=str(app_entrypoint_file),
579684
enable_app_server=start_server,
580685
flow_servers=flow_servers,
581686
network_config=network_configs,
582-
user_requested_flow_compute_config=V1UserRequestedFlowComputeConfig(
583-
name=self.app.flow_cloud_compute.name,
584-
shm_size=self.app.flow_cloud_compute.shm_size,
585-
preemptible=False,
586-
),
587687
works=works,
588688
local_source=True,
589-
is_headless=_is_headless(self.app),
590689
)
591690

691+
if self.app is not None:
692+
run_body.user_requested_flow_compute_config = V1UserRequestedFlowComputeConfig(
693+
name=self.app.flow_cloud_compute.name,
694+
shm_size=self.app.flow_cloud_compute.shm_size,
695+
preemptible=False,
696+
)
697+
698+
run_body.is_headless = _is_headless(self.app)
699+
592700
# if requirements file at the root of the repository is present,
593701
# we pass just the file name to the backend, so backend can find it in the relative path
594702
requirements_file = root / "requirements.txt"
@@ -695,9 +803,9 @@ def _api_transfer_run_instance(
695803
run_id: str,
696804
instance_id: str,
697805
desired_state: V1LightningappInstanceState,
698-
queue_server_type: V1QueueServerType,
699-
env_vars: List[V1EnvVar],
700-
auth: V1LightningAuth,
806+
queue_server_type: Optional[V1QueueServerType] = None,
807+
env_vars: Optional[List[V1EnvVar]] = None,
808+
auth: Optional[V1LightningAuth] = None,
701809
) -> Externalv1LightningappInstance:
702810
"""Transfer an existing instance to the given run ID and update its specification.
703811
@@ -732,9 +840,9 @@ def _api_create_run_instance(
732840
cloudspace_id: str,
733841
run_id: str,
734842
desired_state: V1LightningappInstanceState,
735-
queue_server_type: V1QueueServerType,
736-
env_vars: List[V1EnvVar],
737-
auth: V1LightningAuth,
843+
queue_server_type: Optional[V1QueueServerType] = None,
844+
env_vars: Optional[List[V1EnvVar]] = None,
845+
auth: Optional[V1LightningAuth] = None,
738846
) -> Externalv1LightningappInstance:
739847
"""Create a new instance of the given run with the given specification."""
740848
return self.backend.client.cloud_space_service_create_lightning_run_instance(
@@ -775,7 +883,12 @@ def _print_specs(run_body: CloudspaceIdRunsBody, print_format: str) -> None:
775883
requirements_path = getattr(getattr(run_body.image_spec, "dependency_file_info", ""), "path", "")
776884
logger.info(f"requirements_path: {requirements_path}")
777885

778-
@staticmethod
779-
def _get_app_url(lightning_app_instance: Externalv1LightningappInstance, need_credits: bool = False) -> str:
886+
def _get_app_url(
887+
self,
888+
run_instance: Externalv1LightningappInstance,
889+
tab: str,
890+
need_credits: bool = False,
891+
) -> str:
892+
user = self.backend.client.auth_service_get_user()
780893
action = "?action=add_credits" if need_credits else ""
781-
return f"{get_lightning_cloud_url()}/me/apps/{lightning_app_instance.id}{action}"
894+
return f"{get_lightning_cloud_url()}/{user.username}/apps/{run_instance.id}/{tab}{action}"

src/lightning_app/runners/runtime.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def dispatch(
7272

7373
runtime = runtime_cls(
7474
app=app,
75-
entrypoint_file=entrypoint_file,
75+
entrypoint=entrypoint_file,
7676
start_server=start_server,
7777
host=host,
7878
port=port,
@@ -90,8 +90,8 @@ def dispatch(
9090

9191
@dataclass
9292
class Runtime:
93-
app: LightningApp
94-
entrypoint_file: Optional[Path] = None
93+
app: Optional[LightningApp] = None
94+
entrypoint: Optional[Path] = None
9595
start_server: bool = True
9696
host: str = APP_SERVER_HOST
9797
port: int = APP_SERVER_PORT
@@ -107,9 +107,10 @@ class Runtime:
107107

108108
def __post_init__(self):
109109
if isinstance(self.backend, str):
110-
self.backend = BackendType(self.backend).get_backend(self.entrypoint_file)
110+
self.backend = BackendType(self.backend).get_backend(self.entrypoint)
111111

112-
LightningFlow._attach_backend(self.app.root, self.backend)
112+
if self.app is not None:
113+
LightningFlow._attach_backend(self.app.root, self.backend)
113114

114115
def terminate(self) -> None:
115116
"""This method is used to terminate all the objects (threads, processes, etc..) created by the app."""

0 commit comments

Comments
 (0)