3
3
import time
4
4
from datetime import datetime
5
5
from textwrap import dedent
6
- from typing import Any , List
6
+ from typing import Any , List , Union
7
7
8
8
import click
9
+ import lightning_cloud
9
10
from lightning_cloud .openapi import (
10
11
Externalv1Cluster ,
11
12
V1AWSClusterDriverSpec ,
15
16
V1ClusterState ,
16
17
V1ClusterType ,
17
18
V1CreateClusterRequest ,
19
+ V1GetClusterResponse ,
18
20
V1KubernetesClusterDriver ,
19
21
)
22
+ from lightning_utilities .core .enums import StrEnum
20
23
from rich .console import Console
21
24
from rich .table import Table
22
25
from rich .text import Text
25
28
from lightning_app .utilities .network import LightningClient
26
29
from lightning_app .utilities .openapi import create_openapi_object , string2dict
27
30
28
- CLUSTER_STATE_CHECKING_TIMEOUT = 60
29
31
MAX_CLUSTER_WAIT_TIME = 5400
30
32
31
33
34
+ class ClusterState (StrEnum ):
35
+ UNSPECIFIED = "unspecified"
36
+ QUEUED = "queued"
37
+ PENDING = "pending"
38
+ RUNNING = "running"
39
+ FAILED = "error"
40
+ DELETED = "deleted"
41
+
42
+ def __str__ (self ) -> str :
43
+ return str (self .value )
44
+
45
+ @classmethod
46
+ def from_api (cls , status : V1ClusterState ) -> "ClusterState" :
47
+ parsed = str (status ).lower ().split ("_" , maxsplit = 2 )[- 1 ]
48
+ return cls .from_str (parsed )
49
+
50
+
32
51
class ClusterList (Formatable ):
33
52
def __init__ (self , clusters : List [Externalv1Cluster ]):
34
53
self .clusters = clusters
@@ -86,7 +105,7 @@ def create(
86
105
region : str = "us-east-1" ,
87
106
external_id : str = None ,
88
107
edit_before_creation : bool = False ,
89
- wait : bool = False ,
108
+ do_async : bool = False ,
90
109
) -> None :
91
110
"""request Lightning AI BYOC compute cluster creation.
92
111
@@ -97,7 +116,7 @@ def create(
97
116
region: AWS region containing compute resources
98
117
external_id: AWS IAM Role external ID
99
118
edit_before_creation: Enables interactive editing of requests before submitting it to Lightning AI.
100
- wait: Waits for the cluster to be in a RUNNING state. Only use this for debugging.
119
+ do_async: Triggers cluster creation in the background and exits
101
120
"""
102
121
performance_profile = V1ClusterPerformanceProfile .DEFAULT
103
122
if cost_savings :
@@ -130,22 +149,31 @@ def create(
130
149
click .echo ("cluster unchanged" )
131
150
132
151
resp = self .api_client .cluster_service_create_cluster (body = new_body )
133
- if wait :
134
- _wait_for_cluster_state (self .api_client , resp .id , V1ClusterState .RUNNING )
135
-
136
152
click .echo (
137
153
dedent (
138
154
f"""\
139
- { resp .id } is now being created... This can take up to an hour.
155
+ BYOC cluster creation triggered successfully!
156
+ This can take up to an hour to complete.
140
157
141
158
To view the status of your clusters use:
142
- ` lightning list clusters`
159
+ lightning list clusters
143
160
144
161
To view cluster logs use:
145
- `lightning show cluster logs { resp .id } `
146
- """
162
+ lightning show cluster logs { cluster_name }
163
+
164
+ To delete the cluster run:
165
+ lightning delete cluster { cluster_name }
166
+ """
147
167
)
148
168
)
169
+ background_message = "\n Cluster will be created in the background!"
170
+ if do_async :
171
+ click .echo (background_message )
172
+ else :
173
+ try :
174
+ _wait_for_cluster_state (self .api_client , resp .id , V1ClusterState .RUNNING )
175
+ except KeyboardInterrupt :
176
+ click .echo (background_message )
149
177
150
178
def get_clusters (self ) -> ClusterList :
151
179
resp = self .api_client .cluster_service_list_clusters (phase_not_in = [V1ClusterState .DELETED ])
@@ -156,7 +184,7 @@ def list(self) -> None:
156
184
console = Console ()
157
185
console .print (clusters .as_table ())
158
186
159
- def delete (self , cluster_id : str , force : bool = False , wait : bool = False ) -> None :
187
+ def delete (self , cluster_id : str , force : bool = False , do_async : bool = False ) -> None :
160
188
if force :
161
189
click .echo (
162
190
"""
@@ -167,47 +195,86 @@ def delete(self, cluster_id: str, force: bool = False, wait: bool = False) -> No
167
195
)
168
196
click .confirm ("Do you want to continue?" , abort = True )
169
197
198
+ resp : V1GetClusterResponse = self .api_client .cluster_service_get_cluster (id = cluster_id )
199
+ bucket_name = resp .spec .driver .kubernetes .aws .bucket_name
200
+
170
201
self .api_client .cluster_service_delete_cluster (id = cluster_id , force = force )
171
- click .echo ("Cluster deletion triggered successfully" )
202
+ click .echo (
203
+ dedent (
204
+ f"""\
205
+ Cluster deletion triggered successfully
206
+
207
+ For safety purposes we will not delete anything in the S3 bucket associated with the cluster:
208
+ { bucket_name }
172
209
173
- if wait :
174
- _wait_for_cluster_state (self .api_client , cluster_id , V1ClusterState .DELETED )
210
+ You may want to delete it manually using the AWS CLI:
211
+ aws s3 rb --force s3://{ bucket_name }
212
+ """
213
+ )
214
+ )
215
+
216
+ background_message = "\n Cluster will be deleted in the background!"
217
+ if do_async :
218
+ click .echo (background_message )
219
+ else :
220
+ try :
221
+ _wait_for_cluster_state (self .api_client , cluster_id , V1ClusterState .DELETED )
222
+ except KeyboardInterrupt :
223
+ click .echo (background_message )
175
224
176
225
177
226
def _wait_for_cluster_state (
178
227
api_client : LightningClient ,
179
228
cluster_id : str ,
180
229
target_state : V1ClusterState ,
181
- max_wait_time : int = MAX_CLUSTER_WAIT_TIME ,
182
- check_timeout : int = CLUSTER_STATE_CHECKING_TIMEOUT ,
230
+ timeout_seconds : int = MAX_CLUSTER_WAIT_TIME ,
231
+ poll_duration_seconds : int = 10 ,
183
232
) -> None :
184
233
"""_wait_for_cluster_state waits until the provided cluster has reached a desired state, or failed.
185
234
235
+ Messages will be displayed to the user as the cluster changes state.
236
+ We poll the API server for any changes
237
+
186
238
Args:
187
239
api_client: LightningClient used for polling
188
240
cluster_id: Specifies the cluster to wait for
189
241
target_state: Specifies the desired state the target cluster needs to meet
190
- max_wait_time : Maximum duration to wait (in seconds)
191
- check_timeout : duration between polling for the cluster state (in seconds)
242
+ timeout_seconds : Maximum duration to wait
243
+ poll_duration_seconds : duration between polling for the cluster state
192
244
"""
193
245
start = time .time ()
194
246
elapsed = 0
195
- while elapsed < max_wait_time :
196
- cluster_resp = api_client .cluster_service_list_clusters ()
197
- new_cluster = None
198
- for clust in cluster_resp .clusters :
199
- if clust .id == cluster_id :
200
- new_cluster = clust
201
- break
202
- if new_cluster is not None :
203
- if new_cluster .status .phase == target_state :
247
+
248
+ click .echo (f"Waiting for cluster to be { ClusterState .from_api (target_state )} ..." )
249
+ while elapsed < timeout_seconds :
250
+ try :
251
+ resp : V1GetClusterResponse = api_client .cluster_service_get_cluster (id = cluster_id )
252
+ click .echo (_cluster_status_long (cluster = resp , desired_state = target_state , elapsed = elapsed ))
253
+ if resp .status .phase == target_state :
204
254
break
205
- elif new_cluster .status .phase == V1ClusterState .FAILED :
206
- raise click .ClickException (f"Cluster { cluster_id } is in failed state." )
207
- time .sleep (check_timeout )
208
- elapsed = int (time .time () - start )
255
+ time .sleep (poll_duration_seconds )
256
+ elapsed = int (time .time () - start )
257
+ except lightning_cloud .openapi .rest .ApiException as e :
258
+ if e .status == 404 and target_state == V1ClusterState .DELETED :
259
+ return
260
+ raise
209
261
else :
210
- raise click .ClickException ("Max wait time elapsed" )
262
+ state_str = ClusterState .from_api (target_state )
263
+ raise click .ClickException (
264
+ dedent (
265
+ f"""\
266
+ The cluster has not entered the { state_str } state within { _format_elapsed_seconds (timeout_seconds )} .
267
+
268
+ The cluster may eventually be { state_str } afterwards, please check its status using:
269
+ lighting list clusters
270
+
271
+ To view cluster logs use:
272
+ lightning show cluster logs { cluster_id }
273
+
274
+ Contact [email protected] for additional help
275
+ """
276
+ )
277
+ )
211
278
212
279
213
280
def _check_cluster_name_is_valid (_ctx : Any , _param : Any , value : str ) -> str :
@@ -219,3 +286,76 @@ def _check_cluster_name_is_valid(_ctx: Any, _param: Any, value: str) -> str:
219
286
Provide a cluster name using valid characters and try again."""
220
287
)
221
288
return value
289
+
290
+
291
+ def _cluster_status_long (cluster : V1GetClusterResponse , desired_state : V1ClusterState , elapsed : float ) -> str :
292
+ """Echos a long-form status message to the user about the cluster state.
293
+
294
+ Args:
295
+ cluster: The cluster object
296
+ elapsed: Seconds since we've started polling
297
+ """
298
+
299
+ cluster_name = cluster .name
300
+ current_state = cluster .status .phase
301
+ current_reason = cluster .status .reason
302
+ bucket_name = cluster .spec .driver .kubernetes .aws .bucket_name
303
+
304
+ duration = _format_elapsed_seconds (elapsed )
305
+
306
+ if current_state == V1ClusterState .FAILED :
307
+ return dedent (
308
+ f"""\
309
+ The requested cluster operation for cluster { cluster_name } has errors:
310
+ { current_reason }
311
+
312
+ ---
313
+ We are automatically retrying, and an automated alert has been created
314
+
315
+ WARNING: Any non-deleted cluster may be using resources.
316
+ To avoid incuring cost on your cloud provider, delete the cluster using the following command:
317
+ lightning delete cluster { cluster_name }
318
+
319
+ Contact [email protected] for additional help
320
+ """
321
+ )
322
+
323
+ if desired_state == current_state == V1ClusterState .RUNNING :
324
+ return dedent (
325
+ f"""\
326
+ Cluster { cluster_name } is now running and ready to use.
327
+ To launch an app on this cluster use: lightning run app app.py --cloud --cluster-id { cluster_name }
328
+ """
329
+ )
330
+
331
+ if desired_state == V1ClusterState .RUNNING :
332
+ return f"Cluster { cluster_name } is being created [elapsed={ duration } ]"
333
+
334
+ if desired_state == current_state == V1ClusterState .DELETED :
335
+ return dedent (
336
+ f"""\
337
+ Cluster { cluster_name } has been successfully deleted, and almost all AWS resources have been removed
338
+
339
+ For safety purposes we kept the S3 bucket associated with the cluster: { bucket_name }
340
+
341
+ You may want to delete it manually using the AWS CLI:
342
+ aws s3 rb --force s3://{ bucket_name }
343
+ """
344
+ )
345
+
346
+ if desired_state == V1ClusterState .DELETED :
347
+ return f"Cluster { cluster_name } is being deleted [elapsed={ duration } ]"
348
+
349
+ raise click .ClickException (f"Unknown cluster desired state { desired_state } " )
350
+
351
+
352
+ def _format_elapsed_seconds (seconds : Union [float , int ]) -> str :
353
+ """Turns seconds into a duration string.
354
+
355
+ >>> _format_elapsed_seconds(5)
356
+ '05s'
357
+ >>> _format_elapsed_seconds(60)
358
+ '01m00s'
359
+ """
360
+ minutes , seconds = divmod (seconds , 60 )
361
+ return (f"{ minutes :02} m" if minutes else "" ) + f"{ seconds :02} s"
0 commit comments