15
15
import tarfile
16
16
import tempfile
17
17
from pathlib import Path
18
- from typing import Dict , List , Optional
18
+ from typing import Any , Dict , List , Optional
19
19
from urllib .parse import urlparse
20
20
21
21
import requests
24
24
from fastapi .middleware .cors import CORSMiddleware
25
25
from pydantic import BaseModel
26
26
27
+ from lightning .app .core import constants
28
+ from lightning .app .plugin .actions import _Action
27
29
from lightning .app .utilities .app_helpers import Logger
28
30
from lightning .app .utilities .component import _set_flow_context
29
31
from lightning .app .utilities .enum import AppStage
@@ -41,16 +43,20 @@ def __init__(self) -> None:
41
43
self .cloudspace_id = ""
42
44
self .cluster_id = ""
43
45
44
- def run (self , * args : str , ** kwargs : str ) -> None :
46
+ def run (self , * args : str , ** kwargs : str ) -> Optional [ List [ _Action ]] :
45
47
"""Override with the logic to execute on the cloudspace."""
48
+ raise NotImplementedError
46
49
47
- def run_job (self , name : str , app_entrypoint : str , env_vars : Optional [Dict [str , str ]] = None ) -> None :
50
+ def run_job (self , name : str , app_entrypoint : str , env_vars : Optional [Dict [str , str ]] = None ) -> str :
48
51
"""Run a job in the cloudspace associated with this plugin.
49
52
50
53
Args:
51
54
name: The name of the job.
52
55
app_entrypoint: The path of the file containing the app to run.
53
56
env_vars: Additional env vars to set when running the app.
57
+
58
+ Returns:
59
+ The relative URL of the created job.
54
60
"""
55
61
from lightning .app .runners .cloud import CloudRuntime
56
62
@@ -74,12 +80,14 @@ def run_job(self, name: str, app_entrypoint: str, env_vars: Optional[Dict[str, s
74
80
# Used to indicate Lightning has been dispatched
75
81
os .environ ["LIGHTNING_DISPATCHED" ] = "1"
76
82
77
- runtime .cloudspace_dispatch (
83
+ url = runtime .cloudspace_dispatch (
78
84
project_id = self .project_id ,
79
85
cloudspace_id = self .cloudspace_id ,
80
86
name = name ,
81
87
cluster_id = self .cluster_id ,
82
88
)
89
+ # Return a relative URL so it can be used with the NavigateTo action.
90
+ return url .replace (constants .get_lightning_cloud_url (), "" )
83
91
84
92
def _setup (
85
93
self ,
@@ -101,7 +109,7 @@ class _Run(BaseModel):
101
109
plugin_arguments : Dict [str , str ]
102
110
103
111
104
- def _run_plugin (run : _Run ) -> List :
112
+ def _run_plugin (run : _Run ) -> Dict [ str , Any ] :
105
113
"""Create a run with the given name and entrypoint under the cloudspace with the given ID."""
106
114
with tempfile .TemporaryDirectory () as tmpdir :
107
115
download_path = os .path .join (tmpdir , "source.tar.gz" )
@@ -115,6 +123,9 @@ def _run_plugin(run: _Run) -> List:
115
123
116
124
response = requests .get (source_code_url )
117
125
126
+ # TODO: Backoff retry a few times in case the URL is flaky
127
+ response .raise_for_status ()
128
+
118
129
with open (download_path , "wb" ) as f :
119
130
f .write (response .content )
120
131
except Exception as e :
@@ -152,17 +163,15 @@ def _run_plugin(run: _Run) -> List:
152
163
cloudspace_id = run .cloudspace_id ,
153
164
cluster_id = run .cluster_id ,
154
165
)
155
- plugin .run (** run .plugin_arguments )
166
+ actions = plugin .run (** run .plugin_arguments ) or []
167
+ return {"actions" : [action .to_spec ().to_dict () for action in actions ]}
156
168
except Exception as e :
157
169
raise HTTPException (
158
170
status_code = status .HTTP_500_INTERNAL_SERVER_ERROR , detail = f"Error running plugin: { str (e )} ."
159
171
)
160
172
finally :
161
173
os .chdir (cwd )
162
174
163
- # TODO: Return actions from the plugin here
164
- return []
165
-
166
175
167
176
def _start_plugin_server (host : str , port : int ) -> None :
168
177
"""Start the plugin server which can be used to dispatch apps or run plugins."""
0 commit comments