Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/lightning_app/cli/lightning_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from lightning_app.cli.lightning_cli_delete import delete
from lightning_app.cli.lightning_cli_list import get_list
from lightning_app.core.constants import DEBUG, ENABLE_APP_COMMENT_COMMAND_EXECUTION, get_lightning_cloud_url
from lightning_app.runners.cloud import CloudRuntime
from lightning_app.runners.runtime import dispatch
from lightning_app.runners.runtime_type import RuntimeType
from lightning_app.utilities.app_commands import run_app_commands
Expand Down Expand Up @@ -380,6 +381,27 @@ def run_app(

run.add_command(_run_model)


@_main.command("open")
@click.argument("path", type=str, default=".")
@click.option(
"--cluster-id",
type=str,
default=None,
help="Open on a specific Lightning AI BYOC compute cluster",
)
@click.option("--name", help="The name to use for the CloudSpace", default="", type=str)
def open(path: str, cluster_id: str, name: str) -> None:
"""Open files or folders on the cloud."""

if not os.path.exists(path):
click.echo(f"The provided path `{path}` doesn't exist.")
sys.exit(1)

runtime = CloudRuntime(entrypoint=Path(path))
runtime.open(name, cluster_id)


_main.add_command(get_list)
_main.add_command(delete)
_main.add_command(create)
Expand Down
195 changes: 151 additions & 44 deletions src/lightning_app/runners/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from lightning_app.runners.backends.cloud import CloudBackend
from lightning_app.runners.runtime import Runtime
from lightning_app.source_code import LocalSourceCodeDir
from lightning_app.source_code.copytree import _filter_ignored, _parse_lightningignore
from lightning_app.source_code.copytree import _filter_ignored, _IGNORE_FUNCTION, _parse_lightningignore
from lightning_app.storage import Drive, Mount
from lightning_app.utilities.app_helpers import _is_headless, Logger
from lightning_app.utilities.auth import _credential_string_to_basic_auth_params
Expand Down Expand Up @@ -106,6 +106,90 @@ def _to_clean_dict(swagger_object, map_attributes):
class CloudRuntime(Runtime):
backend: Union[str, CloudBackend] = "cloud"

def open(self, name: str, cluster_id: Optional[str] = None):
"""Method to open a CloudSpace with the root folder uploaded."""
try:
# Check for feature support
user = self.backend.client.auth_service_get_user()
if not user.features.code_tab:
print(
"The `lightning open` command has not been enabled for your account. "
"To request access, please contact [email protected]"
)
sys.exit(1)

# Dispatch in four phases: resolution, validation, spec creation, API transactions
# Resolution
cloudspace_config = self._resolve_config(name, load=False)
root = self._resolve_root()
ignore_functions = self._resolve_open_ignore_functions()
repo = self._resolve_repo(root, ignore_functions)
project = self._resolve_project()
existing_cloudspaces = self._resolve_existing_cloudspaces(project, cloudspace_config.name)
cluster_id = self._resolve_cluster_id(cluster_id, project.project_id, existing_cloudspaces)
existing_cloudspace, existing_run_instance = self._resolve_existing_run_instance(
cluster_id, project.project_id, existing_cloudspaces
)
cloudspace_name = self._resolve_cloudspace_name(
cloudspace_config.name,
existing_cloudspace,
existing_cloudspaces,
)
needs_credits = self._resolve_needs_credits(project)

# Validation
# Note: We do not validate the repo here since open only uploads a directory if asked explicitly
self._validate_cluster_id(cluster_id, project.project_id)

# Spec creation
run_body = self._get_run_body(cluster_id, [], None, [], True, root, self.start_server)

if existing_run_instance is not None:
print(
f"Re-opening the CloudSpace {cloudspace_config.name}. "
"This operation will create a new run but will not update your files."
)
else:
print(f"The name of the CloudSpace is: {cloudspace_config.name}")

# API transactions
cloudspace_id = self._api_create_cloudspace_if_not_exists(
project.project_id,
cloudspace_name,
existing_cloudspace,
)
self._api_stop_existing_run_instance(project.project_id, existing_run_instance)
run = self._api_create_run(project.project_id, cloudspace_id, run_body)
self._api_package_and_upload_repo(repo, run)

if getattr(run, "cluster_id", None):
print(f"Running on {run.cluster_id}")

# TODO: We shouldn't need to create an instance here
if existing_run_instance is not None:
run_instance = self._api_transfer_run_instance(
project.project_id,
run.id,
existing_run_instance.id,
V1LightningappInstanceState.STOPPED,
)
else:
run_instance = self._api_create_run_instance(
cluster_id,
project.project_id,
cloudspace_name,
cloudspace_id,
run.id,
V1LightningappInstanceState.STOPPED,
)

if "PYTEST_CURRENT_TEST" not in os.environ:
click.launch(self._get_app_url(run_instance, "code", needs_credits))

except ApiException as e:
logger.error(e.body)
sys.exit(1)

def dispatch(
self,
name: str = "",
Expand All @@ -116,10 +200,10 @@ def dispatch(
) -> None:
"""Method to dispatch and run the :class:`~lightning_app.core.app.LightningApp` in the cloud."""
# not user facing error ideally - this should never happen in normal user workflow
if not self.entrypoint_file:
if not self.entrypoint:
raise ValueError(
"Entrypoint file not provided. Did you forget to "
"initialize the Runtime object with `entrypoint_file` argument?"
"initialize the Runtime object with `entrypoint` argument?"
)

cleanup_handle = None
Expand Down Expand Up @@ -213,20 +297,20 @@ def dispatch(
env_vars,
auth,
)

if run_instance.status.phase == V1LightningappInstanceState.FAILED:
raise RuntimeError("Failed to create the application. Cannot upload the source code.")

# TODO: Remove testing dependency, but this would open a tab for each test...
if open_ui and "PYTEST_CURRENT_TEST" not in os.environ:
click.launch(self._get_app_url(run_instance, "logs" if run.is_headless else "web-ui", needs_credits))
except ApiException as e:
logger.error(e.body)
sys.exit(1)
finally:
if cleanup_handle:
cleanup_handle()

if run_instance.status.phase == V1LightningappInstanceState.FAILED:
raise RuntimeError("Failed to create the application. Cannot upload the source code.")

# TODO: Remove testing dependency, but this would open a tab for each test...
if open_ui and "PYTEST_CURRENT_TEST" not in os.environ:
click.launch(self._get_app_url(run_instance, needs_credits))

@classmethod
def load_app_from_file(cls, filepath: str) -> "LightningApp":
"""Load a LightningApp from a file, mocking the imports."""
Expand All @@ -248,36 +332,50 @@ def load_app_from_file(cls, filepath: str) -> "LightningApp":
del os.environ["LAI_RUNNING_IN_CLOUD"]
return app

def _resolve_config(self, name: Optional[str]) -> AppConfig:
def _resolve_config(self, name: Optional[str], load: bool = True) -> AppConfig:
"""Find and load the config file if it exists (otherwise create an empty config).

Override the name if provided.
"""
config_file = _get_config_file(self.entrypoint_file)
cloudspace_config = AppConfig.load_from_file(config_file) if config_file.exists() else AppConfig()
config_file = _get_config_file(self.entrypoint)
cloudspace_config = AppConfig.load_from_file(config_file) if config_file.exists() and load else AppConfig()
if name:
# Override the name if provided
cloudspace_config.name = name
return cloudspace_config

def _resolve_root(self) -> Path:
"""Determine the root of the project."""
return Path(self.entrypoint_file).absolute().parent

def _resolve_repo(self, root: Path) -> LocalSourceCodeDir:
root = Path(self.entrypoint).absolute()
if root.is_file():
root = root.parent
return root

def _resolve_open_ignore_functions(self) -> List[_IGNORE_FUNCTION]:
entrypoint = self.entrypoint.absolute()
if entrypoint.is_file():
return [lambda src, paths: [path for path in paths if path.absolute() == entrypoint]]
return []

def _resolve_repo(
self,
root: Path,
ignore_functions: Optional[List[_IGNORE_FUNCTION]] = None,
) -> LocalSourceCodeDir:
"""Gather and merge all lightningignores from the app children and create the ``LocalSourceCodeDir``
object."""

flow_lightningignores = [flow.lightningignore for flow in self.app.flows]
work_lightningignores = [work.lightningignore for work in self.app.works]
lightningignores = flow_lightningignores + work_lightningignores
if lightningignores:
merged = sum(lightningignores, tuple())
logger.debug(f"Found the following lightningignores: {merged}")
patterns = _parse_lightningignore(merged)
ignore_functions = [partial(_filter_ignored, root, patterns)]
else:
ignore_functions = None
if ignore_functions is None:
ignore_functions = []

if self.app is not None:
flow_lightningignores = [flow.lightningignore for flow in self.app.flows]
work_lightningignores = [work.lightningignore for work in self.app.works]
lightningignores = flow_lightningignores + work_lightningignores
if lightningignores:
merged = sum(lightningignores, tuple())
logger.debug(f"Found the following lightningignores: {merged}")
patterns = _parse_lightningignore(merged)
ignore_functions = [*ignore_functions, partial(_filter_ignored, root, patterns)]

return LocalSourceCodeDir(path=root, ignore_functions=ignore_functions)

Expand Down Expand Up @@ -562,7 +660,7 @@ def _get_run_body(
self,
cluster_id: str,
flow_servers: List[V1Flowserver],
network_configs: List[V1NetworkConfig],
network_configs: Optional[List[V1NetworkConfig]],
works: List[V1Work],
no_cache: bool,
root: Path,
Expand All @@ -571,24 +669,28 @@ def _get_run_body(
"""Get the specification of the run creation request."""
# The entry point file needs to be relative to the root of the uploaded source file directory,
# because the backend will invoke the lightning commands relative said source directory
app_entrypoint_file = Path(self.entrypoint_file).absolute().relative_to(root)
# TODO: we shouldn't set this if the entrypoint isn't a file but the backend gives an error if we don't
app_entrypoint_file = Path(self.entrypoint).absolute().relative_to(root)

run_body = CloudspaceIdRunsBody(
cluster_id=cluster_id,
app_entrypoint_file=str(app_entrypoint_file),
enable_app_server=start_server,
flow_servers=flow_servers,
network_config=network_configs,
user_requested_flow_compute_config=V1UserRequestedFlowComputeConfig(
name=self.app.flow_cloud_compute.name,
shm_size=self.app.flow_cloud_compute.shm_size,
preemptible=False,
),
works=works,
local_source=True,
is_headless=_is_headless(self.app),
)

if self.app is not None:
run_body.user_requested_flow_compute_config = V1UserRequestedFlowComputeConfig(
name=self.app.flow_cloud_compute.name,
shm_size=self.app.flow_cloud_compute.shm_size,
preemptible=False,
)

run_body.is_headless = _is_headless(self.app)

# if requirements file at the root of the repository is present,
# we pass just the file name to the backend, so backend can find it in the relative path
requirements_file = root / "requirements.txt"
Expand Down Expand Up @@ -695,9 +797,9 @@ def _api_transfer_run_instance(
run_id: str,
instance_id: str,
desired_state: V1LightningappInstanceState,
queue_server_type: V1QueueServerType,
env_vars: List[V1EnvVar],
auth: V1LightningAuth,
queue_server_type: Optional[V1QueueServerType] = None,
env_vars: Optional[List[V1EnvVar]] = None,
auth: Optional[V1LightningAuth] = None,
) -> Externalv1LightningappInstance:
"""Transfer an existing instance to the given run ID and update its specification.

Expand Down Expand Up @@ -732,9 +834,9 @@ def _api_create_run_instance(
cloudspace_id: str,
run_id: str,
desired_state: V1LightningappInstanceState,
queue_server_type: V1QueueServerType,
env_vars: List[V1EnvVar],
auth: V1LightningAuth,
queue_server_type: Optional[V1QueueServerType] = None,
env_vars: Optional[List[V1EnvVar]] = None,
auth: Optional[V1LightningAuth] = None,
) -> Externalv1LightningappInstance:
"""Create a new instance of the given run with the given specification."""
return self.backend.client.cloud_space_service_create_lightning_run_instance(
Expand Down Expand Up @@ -775,7 +877,12 @@ def _print_specs(run_body: CloudspaceIdRunsBody, print_format: str) -> None:
requirements_path = getattr(getattr(run_body.image_spec, "dependency_file_info", ""), "path", "")
logger.info(f"requirements_path: {requirements_path}")

@staticmethod
def _get_app_url(lightning_app_instance: Externalv1LightningappInstance, need_credits: bool = False) -> str:
def _get_app_url(
self,
run_instance: Externalv1LightningappInstance,
tab: str,
need_credits: bool = False,
) -> str:
user = self.backend.client.auth_service_get_user()
action = "?action=add_credits" if need_credits else ""
return f"{get_lightning_cloud_url()}/me/apps/{lightning_app_instance.id}{action}"
return f"{get_lightning_cloud_url()}/{user.username}/apps/{run_instance.id}/{tab}{action}"
11 changes: 6 additions & 5 deletions src/lightning_app/runners/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def dispatch(

runtime = runtime_cls(
app=app,
entrypoint_file=entrypoint_file,
entrypoint=entrypoint_file,
start_server=start_server,
host=host,
port=port,
Expand All @@ -90,8 +90,8 @@ def dispatch(

@dataclass
class Runtime:
app: LightningApp
entrypoint_file: Optional[Path] = None
app: Optional[LightningApp] = None
entrypoint: Optional[Path] = None
start_server: bool = True
host: str = APP_SERVER_HOST
port: int = APP_SERVER_PORT
Expand All @@ -107,9 +107,10 @@ class Runtime:

def __post_init__(self):
if isinstance(self.backend, str):
self.backend = BackendType(self.backend).get_backend(self.entrypoint_file)
self.backend = BackendType(self.backend).get_backend(self.entrypoint)

LightningFlow._attach_backend(self.app.root, self.backend)
if self.app is not None:
LightningFlow._attach_backend(self.app.root, self.backend)

def terminate(self) -> None:
"""This method is used to terminate all the objects (threads, processes, etc..) created by the app."""
Expand Down
Loading