Skip to content

Commit 05a8d54

Browse files
ethanwharrisBorda
authored andcommitted
[App] Support running on multiple clusters (#16016)
(cherry picked from commit d3a7226)
1 parent 1438255 commit 05a8d54

File tree

4 files changed

+223
-141
lines changed

4 files changed

+223
-141
lines changed

src/lightning_app/runners/cloud.py

Lines changed: 120 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import fnmatch
22
import json
33
import random
4+
import re
45
import string
56
import sys
67
import time
78
from dataclasses import dataclass
89
from functools import partial
910
from pathlib import Path
10-
from textwrap import dedent
1111
from typing import Any, List, Optional, Union
1212

1313
import click
@@ -20,6 +20,7 @@
2020
Externalv1LightningappInstance,
2121
Gridv1ImageSpec,
2222
V1BuildSpec,
23+
V1ClusterType,
2324
V1DependencyFileInfo,
2425
V1Drive,
2526
V1DriveSpec,
@@ -212,8 +213,6 @@ def dispatch(
212213
# Determine the root of the project: Start at the entrypoint_file and look for nearby Lightning config files,
213214
# going up the directory structure. The root of the project is where the Lightning config file is located.
214215

215-
# TODO: verify lightning version
216-
# _verify_lightning_version()
217216
config_file = _get_config_file(self.entrypoint_file)
218217
app_config = AppConfig.load_from_file(config_file) if config_file.exists() else AppConfig()
219218
root = Path(self.entrypoint_file).absolute().parent
@@ -242,10 +241,6 @@ def dispatch(
242241
# Override the name if provided by the CLI
243242
app_config.name = name
244243

245-
if cluster_id:
246-
# Override the cluster ID if provided by the CLI
247-
app_config.cluster_id = cluster_id
248-
249244
print(f"The name of the app is: {app_config.name}")
250245

251246
v1_env_vars = [V1EnvVar(name=k, value=v) for k, v in self.env_vars.items()]
@@ -307,17 +302,92 @@ def dispatch(
307302
project = _get_project(self.backend.client)
308303

309304
try:
310-
list_apps_resp = self.backend.client.lightningapp_v2_service_list_lightningapps_v2(
311-
project_id=project.project_id, name=app_config.name
305+
if cluster_id is not None:
306+
# Verify that the cluster exists
307+
list_clusters_resp = self.backend.client.cluster_service_list_clusters()
308+
cluster_ids = [cluster.id for cluster in list_clusters_resp.clusters]
309+
if cluster_id not in cluster_ids:
310+
raise ValueError(f"You requested to run on cluster {cluster_id}, but that cluster doesn't exist.")
311+
312+
self._ensure_cluster_project_binding(project.project_id, cluster_id)
313+
314+
# Resolve the app name, instance, and cluster ID
315+
existing_instance = None
316+
app_name = app_config.name
317+
318+
# List existing instances
319+
# TODO: Add pagination, otherwise this could break if users have a lot of apps.
320+
find_instances_resp = self.backend.client.lightningapp_instance_service_list_lightningapp_instances(
321+
project_id=project.project_id
312322
)
313-
if list_apps_resp.lightningapps:
314-
# There can be only one app with unique project_id<>name pair
315-
lit_app = list_apps_resp.lightningapps[0]
316-
else:
317-
app_body = Body7(name=app_config.name, can_download_source_code=True)
323+
324+
# Seach for instances with the given name (possibly with some random characters appended)
325+
pattern = re.escape(f"{app_name}-") + ".{4}"
326+
instances = [
327+
lightningapp
328+
for lightningapp in find_instances_resp.lightningapps
329+
if lightningapp.name == app_name or (re.fullmatch(pattern, lightningapp.name) is not None)
330+
]
331+
332+
# If instances exist and cluster is None, mimic cluster selection logic to choose a default
333+
if cluster_id is None and len(instances) > 0:
334+
# Determine the cluster ID
335+
cluster_id = self._get_default_cluster(project.project_id)
336+
337+
# If an instance exists on the cluster with the same base name - restart it
338+
for instance in instances:
339+
if instance.spec.cluster_id == cluster_id:
340+
existing_instance = instance
341+
break
342+
343+
# If instances exist but not on the cluster - choose a randomised name
344+
if len(instances) > 0 and existing_instance is None:
345+
name_exists = True
346+
while name_exists:
347+
random_name = self._randomise_name(app_name)
348+
name_exists = any([instance.name == random_name for instance in instances])
349+
350+
app_name = random_name
351+
352+
# Create the app if it doesn't exist
353+
if existing_instance is None:
354+
app_body = Body7(name=app_name, can_download_source_code=True)
318355
lit_app = self.backend.client.lightningapp_v2_service_create_lightningapp_v2(
319356
project_id=project.project_id, body=app_body
320357
)
358+
app_id = lit_app.id
359+
else:
360+
app_id = existing_instance.spec.app_id
361+
362+
# check if user has sufficient credits to run an app
363+
# if so set the desired state to running otherwise, create the app in stopped state,
364+
# and open the admin ui to add credits and running the app.
365+
has_sufficient_credits = self._project_has_sufficient_credits(project, app=self.app)
366+
app_release_desired_state = (
367+
V1LightningappInstanceState.RUNNING if has_sufficient_credits else V1LightningappInstanceState.STOPPED
368+
)
369+
if not has_sufficient_credits:
370+
logger.warn("You may need Lightning credits to run your apps on the cloud.")
371+
372+
# Stop the instance if it isn't stopped yet
373+
if existing_instance and existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
374+
# TODO(yurij): Implement release switching in the UI and remove this
375+
# We can only switch release of the stopped instance
376+
existing_instance = self.backend.client.lightningapp_instance_service_update_lightningapp_instance(
377+
project_id=project.project_id,
378+
id=existing_instance.id,
379+
body=Body3(spec=V1LightningappInstanceSpec(desired_state=V1LightningappInstanceState.STOPPED)),
380+
)
381+
# wait for the instance to stop for up to 150 seconds
382+
for _ in range(150):
383+
existing_instance = self.backend.client.lightningapp_instance_service_get_lightningapp_instance(
384+
project_id=project.project_id, id=existing_instance.id
385+
)
386+
if existing_instance.status.phase == V1LightningappInstanceState.STOPPED:
387+
break
388+
time.sleep(1)
389+
if existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
390+
raise RuntimeError("Failed to stop the existing instance.")
321391

322392
network_configs: Optional[List[V1NetworkConfig]] = None
323393
if enable_multiple_works_in_default_container():
@@ -332,90 +402,18 @@ def dispatch(
332402
)
333403
initial_port += 1
334404

335-
# check if user has sufficient credits to run an app
336-
# if so set the desired state to running otherwise, create the app in stopped state,
337-
# and open the admin ui to add credits and running the app.
338-
has_sufficient_credits = self._project_has_sufficient_credits(project, app=self.app)
339-
app_release_desired_state = (
340-
V1LightningappInstanceState.RUNNING if has_sufficient_credits else V1LightningappInstanceState.STOPPED
341-
)
342-
if not has_sufficient_credits:
343-
logger.warn("You may need Lightning credits to run your apps on the cloud.")
344-
345-
# right now we only allow a single instance of the app
346-
find_instances_resp = self.backend.client.lightningapp_instance_service_list_lightningapp_instances(
347-
project_id=project.project_id, app_id=lit_app.id
348-
)
349-
350405
queue_server_type = V1QueueServerType.UNSPECIFIED
351406
if CLOUD_QUEUE_TYPE == "http":
352407
queue_server_type = V1QueueServerType.HTTP
353408
elif CLOUD_QUEUE_TYPE == "redis":
354409
queue_server_type = V1QueueServerType.REDIS
355410

356-
existing_instance: Optional[Externalv1LightningappInstance] = None
357-
if find_instances_resp.lightningapps:
358-
existing_instance = find_instances_resp.lightningapps[0]
359-
360-
if not app_config.cluster_id:
361-
# Re-run the app on the same cluster
362-
app_config.cluster_id = existing_instance.spec.cluster_id
363-
364-
if existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
365-
# TODO(yurij): Implement release switching in the UI and remove this
366-
# We can only switch release of the stopped instance
367-
existing_instance = self.backend.client.lightningapp_instance_service_update_lightningapp_instance(
368-
project_id=project.project_id,
369-
id=existing_instance.id,
370-
body=Body3(spec=V1LightningappInstanceSpec(desired_state=V1LightningappInstanceState.STOPPED)),
371-
)
372-
# wait for the instance to stop for up to 150 seconds
373-
for _ in range(150):
374-
existing_instance = self.backend.client.lightningapp_instance_service_get_lightningapp_instance(
375-
project_id=project.project_id, id=existing_instance.id
376-
)
377-
if existing_instance.status.phase == V1LightningappInstanceState.STOPPED:
378-
break
379-
time.sleep(1)
380-
if existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
381-
raise RuntimeError("Failed to stop the existing instance.")
382-
383-
if app_config.cluster_id is not None:
384-
# Verify that the cluster exists
385-
list_clusters_resp = self.backend.client.cluster_service_list_clusters()
386-
cluster_ids = [cluster.id for cluster in list_clusters_resp.clusters]
387-
if app_config.cluster_id not in cluster_ids:
388-
if cluster_id:
389-
msg = f"You requested to run on cluster {cluster_id}, but that cluster doesn't exist."
390-
else:
391-
msg = (
392-
f"Your app last ran on cluster {app_config.cluster_id}, but that cluster "
393-
"doesn't exist anymore."
394-
)
395-
raise ValueError(msg)
396-
if existing_instance and existing_instance.spec.cluster_id != app_config.cluster_id:
397-
raise ValueError(
398-
dedent(
399-
f"""\
400-
An app names {app_config.name} is already running on cluster {existing_instance.spec.cluster_id}, and you requested it to run on cluster {app_config.cluster_id}.
401-
402-
In order to proceed, please either:
403-
a. rename the app to run on {app_config.cluster_id} with the --name option
404-
lightning run app {app_entrypoint_file} --name (new name) --cloud --cluster-id {app_config.cluster_id}
405-
b. delete the app running on {existing_instance.spec.cluster_id} in the UI before running this command.
406-
""" # noqa: E501
407-
)
408-
)
409-
410-
if app_config.cluster_id is not None:
411-
self._ensure_cluster_project_binding(project.project_id, app_config.cluster_id)
412-
413411
release_body = Body8(
414412
app_entrypoint_file=app_spec.app_entrypoint_file,
415413
enable_app_server=app_spec.enable_app_server,
416414
flow_servers=app_spec.flow_servers,
417415
image_spec=app_spec.image_spec,
418-
cluster_id=app_config.cluster_id,
416+
cluster_id=cluster_id,
419417
network_config=network_configs,
420418
works=works,
421419
local_source=True,
@@ -426,14 +424,13 @@ def dispatch(
426424

427425
# create / upload the new app release
428426
lightning_app_release = self.backend.client.lightningapp_v2_service_create_lightningapp_release(
429-
project_id=project.project_id, app_id=lit_app.id, body=release_body
427+
project_id=project.project_id, app_id=app_id, body=release_body
430428
)
431429

432430
if lightning_app_release.source_upload_url == "":
433431
raise RuntimeError("The source upload url is empty.")
434432

435433
if getattr(lightning_app_release, "cluster_id", None):
436-
app_config.cluster_id = lightning_app_release.cluster_id
437434
logger.info(f"Running app on {lightning_app_release.cluster_id}")
438435

439436
# Save the config for re-runs
@@ -442,7 +439,7 @@ def dispatch(
442439
repo.package()
443440
repo.upload(url=lightning_app_release.source_upload_url)
444441

445-
if find_instances_resp.lightningapps:
442+
if existing_instance is not None:
446443
lightning_app_instance = (
447444
self.backend.client.lightningapp_instance_service_update_lightningapp_instance_release(
448445
project_id=project.project_id,
@@ -466,12 +463,12 @@ def dispatch(
466463
lightning_app_instance = (
467464
self.backend.client.lightningapp_v2_service_create_lightningapp_release_instance(
468465
project_id=project.project_id,
469-
app_id=lit_app.id,
466+
app_id=app_id,
470467
id=lightning_app_release.id,
471468
body=Body9(
472-
cluster_id=app_config.cluster_id,
469+
cluster_id=cluster_id,
473470
desired_state=app_release_desired_state,
474-
name=lit_app.name,
471+
name=app_name,
475472
env=v1_env_vars,
476473
queue_server_type=queue_server_type,
477474
),
@@ -504,6 +501,36 @@ def _ensure_cluster_project_binding(self, project_id: str, cluster_id: str):
504501
body=V1ProjectClusterBinding(cluster_id=cluster_id, project_id=project_id),
505502
)
506503

504+
def _get_default_cluster(self, project_id: str) -> str:
505+
"""This utility implements a minimal version of the cluster selection logic used in the cloud.
506+
507+
TODO: This should be requested directly from the platform.
508+
"""
509+
cluster_bindings = self.backend.client.projects_service_list_project_cluster_bindings(
510+
project_id=project_id
511+
).clusters
512+
513+
if not cluster_bindings:
514+
raise ValueError(f"No clusters are bound to the project {project_id}.")
515+
516+
if len(cluster_bindings) == 1:
517+
return cluster_bindings[0].cluster_id
518+
519+
clusters = [
520+
self.backend.client.cluster_service_get_cluster(cluster_binding.cluster_id)
521+
for cluster_binding in cluster_bindings
522+
]
523+
524+
# Filter global clusters
525+
clusters = [cluster for cluster in clusters if cluster.spec.cluster_type == V1ClusterType.GLOBAL]
526+
527+
return random.choice(clusters).id
528+
529+
@staticmethod
530+
def _randomise_name(app_name: str) -> str:
531+
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
532+
return app_name + "-" + "".join(random.sample(letters, 4))
533+
507534
@staticmethod
508535
def _check_uploaded_folder(root: Path, repo: LocalSourceCodeDir) -> None:
509536
"""This method is used to inform the users if their folder files are large and how to filter them."""

src/lightning_app/utilities/packaging/app_config.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import pathlib
22
from dataclasses import asdict, dataclass, field
3-
from typing import Optional, Union
3+
from typing import Union
44

55
import yaml
66

@@ -18,7 +18,6 @@ class AppConfig:
1818
"""
1919

2020
name: str = field(default_factory=get_unique_name)
21-
cluster_id: Optional[str] = field(default=None)
2221

2322
def save_to_file(self, path: Union[str, pathlib.Path]) -> None:
2423
"""Save the configuration to the given file in YAML format."""
@@ -35,6 +34,8 @@ def load_from_file(cls, path: Union[str, pathlib.Path]) -> "AppConfig":
3534
"""Load the configuration from the given file."""
3635
with open(path) as file:
3736
config = yaml.safe_load(file)
37+
# Ignore `cluster_id` without error for backwards compatibility.
38+
config.pop("cluster_id", None)
3839
return cls(**config)
3940

4041
@classmethod

tests/tests_app/cli/test_cloud_cli.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from lightning_cloud.openapi import (
1212
V1LightningappV2,
1313
V1ListLightningappInstancesResponse,
14-
V1ListLightningappsV2Response,
1514
V1ListMembershipsResponse,
1615
V1Membership,
1716
)
@@ -102,8 +101,8 @@ def __init__(self, *args, create_response, **kwargs):
102101
super().__init__()
103102
self.create_response = create_response
104103

105-
def lightningapp_v2_service_list_lightningapps_v2(self, *args, **kwargs):
106-
return V1ListLightningappsV2Response(lightningapps=[V1LightningappV2(id="my_app", name="app")])
104+
def lightningapp_v2_service_create_lightningapp_v2(self, *args, **kwargs):
105+
return V1LightningappV2(id="my_app", name="app")
107106

108107
def lightningapp_v2_service_create_lightningapp_release(self, project_id, app_id, body):
109108
assert project_id == "test-project-id"
@@ -183,7 +182,7 @@ def __init__(self, *args, message, **kwargs):
183182
super().__init__()
184183
self.message = message
185184

186-
def lightningapp_v2_service_list_lightningapps_v2(self, *args, **kwargs):
185+
def lightningapp_instance_service_list_lightningapp_instances(self, *args, **kwargs):
187186
raise ApiException(
188187
http_resp=HttpHeaderDict(
189188
data=self.message,

0 commit comments

Comments
 (0)