Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/lightning_app/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Fixed a bug where `AutoScaler` would fail with min_replica=0 ([#16092](https://github.com/Lightning-AI/lightning/pull/16092)


- Fixed a non-thread safe deepcopy in the scheduler ([#16114](https://github.com/Lightning-AI/lightning/pull/16114))

- Fixed Http Queue sleeping for 1 sec by default if no delta were found ([#16114](https://github.com/Lightning-AI/lightning/pull/16114))


## [1.8.4] - 2022-12-08

### Added
Expand Down
4 changes: 2 additions & 2 deletions src/lightning_app/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
from lightning_app.api.http_methods import _HttpMethod
from lightning_app.api.request_types import _DeltaRequest
from lightning_app.core.constants import (
CLOUD_QUEUE_TYPE,
ENABLE_PULLING_STATE_ENDPOINT,
ENABLE_PUSHING_STATE_ENDPOINT,
ENABLE_STATE_WEBSOCKET,
ENABLE_UPLOAD_ENDPOINT,
FRONTEND_DIR,
get_cloud_queue_type,
)
from lightning_app.core.queues import QueuingSystem
from lightning_app.storage import Drive
Expand Down Expand Up @@ -350,7 +350,7 @@ async def healthz(response: Response):
"""Health check endpoint used in the cloud FastAPI servers to check the status periodically."""
# check the queue status only if running in cloud
if is_running_in_cloud():
queue_obj = QueuingSystem(CLOUD_QUEUE_TYPE).get_queue(queue_name="healthz")
queue_obj = QueuingSystem(get_cloud_queue_type()).get_queue(queue_name="healthz")
# this is only being implemented on Redis Queue. For HTTP Queue, it doesn't make sense to have every single
# app checking the status of the Queue server
if not queue_obj.is_running:
Expand Down
2 changes: 2 additions & 0 deletions src/lightning_app/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ def _collect_deltas_from_ui_and_work_queues(self) -> List[Union[Delta, _APIReque
deltas.append(delta)
else:
api_or_command_request_deltas.append(delta)
else:
break

if api_or_command_request_deltas:
_process_requests(self, api_or_command_request_deltas)
Expand Down
8 changes: 6 additions & 2 deletions src/lightning_app/core/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
from pathlib import Path
from typing import Optional

import lightning_cloud.env

Expand All @@ -13,7 +14,7 @@ def get_lightning_cloud_url() -> str:

SUPPORTED_PRIMITIVE_TYPES = (type(None), str, int, float, bool)
STATE_UPDATE_TIMEOUT = 0.001
STATE_ACCUMULATE_WAIT = 0.05
STATE_ACCUMULATE_WAIT = 0.15
# Duration in seconds of a moving average of a full flow execution
# beyond which an exception is raised.
FLOW_DURATION_THRESHOLD = 1.0
Expand All @@ -25,7 +26,6 @@ def get_lightning_cloud_url() -> str:
APP_SERVER_PORT = _find_lit_app_port(7501)
APP_STATE_MAX_SIZE_BYTES = 1024 * 1024 # 1 MB

CLOUD_QUEUE_TYPE = os.getenv("LIGHTNING_CLOUD_QUEUE_TYPE", None)
WARNING_QUEUE_SIZE = 1000
# different flag because queue debug can be very noisy, and almost always not useful unless debugging the queue itself.
QUEUE_DEBUG_ENABLED = bool(int(os.getenv("LIGHTNING_QUEUE_DEBUG_ENABLED", "0")))
Expand Down Expand Up @@ -77,5 +77,9 @@ def enable_multiple_works_in_default_container() -> bool:
return bool(int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0")))


def get_cloud_queue_type() -> Optional[str]:
return os.getenv("LIGHTNING_CLOUD_QUEUE_TYPE", None)


# Number of seconds to wait between filesystem checks when waiting for files in remote storage
REMOTE_STORAGE_WAIT = 0.5
3 changes: 1 addition & 2 deletions src/lightning_app/core/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,11 @@ def get(self, timeout: int = None) -> Any:

# timeout is some value - loop until the timeout is reached
start_time = time.time()
timeout += 0.1 # add 0.1 seconds as a safe margin
while (time.time() - start_time) < timeout:
try:
return self._get()
except queue.Empty:
time.sleep(HTTP_QUEUE_REFRESH_INTERVAL)
pass

def _get(self):
resp = self.client.post(f"v1/{self.app_id}/{self._name_suffix}", query_params={"action": "pop"})
Expand Down
11 changes: 7 additions & 4 deletions src/lightning_app/runners/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
from lightning_app import LightningWork
from lightning_app.core.app import LightningApp
from lightning_app.core.constants import (
CLOUD_QUEUE_TYPE,
CLOUD_UPLOAD_WARNING,
DEFAULT_NUMBER_OF_EXPOSED_PORTS,
DISABLE_DEPENDENCY_CACHE,
Expand All @@ -60,6 +59,7 @@
ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER,
ENABLE_PULLING_STATE_ENDPOINT,
ENABLE_PUSHING_STATE_ENDPOINT,
get_cloud_queue_type,
get_lightning_cloud_url,
)
from lightning_app.runners.backends.cloud import CloudBackend
Expand Down Expand Up @@ -418,9 +418,11 @@ def dispatch(
initial_port += 1

queue_server_type = V1QueueServerType.UNSPECIFIED
if CLOUD_QUEUE_TYPE == "http":
# Note: Enable app to select their own queue type.
queue_type = get_cloud_queue_type()
if queue_type == "http":
queue_server_type = V1QueueServerType.HTTP
elif CLOUD_QUEUE_TYPE == "redis":
elif queue_type == "redis":
queue_server_type = V1QueueServerType.REDIS

release_body = Body8(
Expand Down Expand Up @@ -496,7 +498,8 @@ def dispatch(
if lightning_app_instance.status.phase == V1LightningappInstanceState.FAILED:
raise RuntimeError("Failed to create the application. Cannot upload the source code.")

if open_ui:
# TODO: Remove testing dependency, but this would open a tab for each test...
if open_ui and "PYTEST_CURRENT_TEST" not in os.environ:
click.launch(self._get_app_url(lightning_app_instance, not has_sufficient_credits))

if cleanup_handle:
Expand Down
11 changes: 5 additions & 6 deletions src/lightning_app/utilities/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ def update_index_file(ui_root: str, info: Optional[AppInfo] = None, root_path: s
entry_file = Path(ui_root) / "index.html"
original_file = Path(ui_root) / "index.original.html"

if root_path:
if not original_file.exists():
shutil.copyfile(entry_file, original_file) # keep backup
else:
# revert index.html in case it was modified after creating original.html
shutil.copyfile(original_file, entry_file)
if not original_file.exists():
shutil.copyfile(entry_file, original_file) # keep backup
else:
# revert index.html in case it was modified after creating original.html
shutil.copyfile(original_file, entry_file)

if info:
with original_file.open() as f:
Expand Down
8 changes: 7 additions & 1 deletion src/lightning_app/utilities/packaging/cloud_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class CloudCompute:
name: str = "default"
disk_size: int = 0
idle_timeout: Optional[int] = None
shm_size: Optional[int] = 0
shm_size: Optional[int] = None
mounts: Optional[Union[Mount, List[Mount]]] = None
_internal_id: Optional[str] = None

Expand All @@ -80,6 +80,12 @@ def __post_init__(self) -> None:

self.name = self.name.lower()

if self.shm_size is None:
if "gpu" in self.name:
self.shm_size = 1024
else:
self.shm_size = 0

# All `default` CloudCompute are identified in the same way.
if self._internal_id is None:
self._internal_id = self._generate_id()
Expand Down
15 changes: 9 additions & 6 deletions src/lightning_app/utilities/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import threading
from copy import deepcopy
from datetime import datetime
from typing import Optional

from croniter import croniter
from deepdiff import DeepDiff, Delta
from deepdiff import Delta

from lightning_app.utilities.proxies import ComponentDelta

Expand Down Expand Up @@ -34,11 +33,15 @@ def run_once(self):
next_event = croniter(metadata["cron_pattern"], start_time).get_next(datetime)
# When the event is reached, send a delta to activate scheduling.
if current_date > next_event:
flow = self._app.get_component_by_name(metadata["name"])
previous_state = deepcopy(flow.state)
flow._enable_schedule(call_hash)
component_delta = ComponentDelta(
id=flow.name, delta=Delta(DeepDiff(previous_state, flow.state, verbose_level=2))
id=metadata["name"],
delta=Delta(
{
"values_changed": {
f"root['calls']['scheduling']['{call_hash}']['running']": {"new_value": True}
}
}
),
)
self._app.delta_queue.put(component_delta)
metadata["start_time"] = next_event.isoformat()
Expand Down
2 changes: 1 addition & 1 deletion tests/tests_app/core/test_lightning_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ async def test_health_endpoint_success():
@pytest.mark.anyio
async def test_health_endpoint_failure(monkeypatch):
monkeypatch.setenv("LIGHTNING_APP_STATE_URL", "http://someurl") # adding this to make is_running_in_cloud pass
monkeypatch.setattr(api, "CLOUD_QUEUE_TYPE", "redis")
monkeypatch.setitem(os.environ, "LIGHTNING_CLOUD_QUEUE_TYPE", "redis")
async with AsyncClient(app=fastapi_service, base_url="http://test") as client:
# will respond 503 if redis is not running
response = await client.get("/healthz")
Expand Down
17 changes: 16 additions & 1 deletion tests/tests_app/core/test_lightning_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import pickle
from re import escape
from time import sleep
from time import sleep, time
from unittest import mock

import pytest
Expand Down Expand Up @@ -482,6 +482,21 @@ def make_delta(i):
assert generated > expect


def test_lightning_app_aggregation_empty():
"""Verify the while loop exits before `state_accumulate_wait` is reached if no deltas are found."""

class SlowQueue(MultiProcessQueue):
def get(self, timeout):
out = super().get(timeout)
return out

app = LightningApp(EmptyFlow())
app.delta_queue = SlowQueue("api_delta_queue", 0)
t0 = time()
assert app._collect_deltas_from_ui_and_work_queues() == []
assert (time() - t0) < app.state_accumulate_wait


class SimpleFlow2(LightningFlow):
def __init__(self):
super().__init__()
Expand Down
2 changes: 1 addition & 1 deletion tests/tests_app/core/test_lightning_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ def run(self):
def test_scheduling_api():

app = LightningApp(FlowSchedule())
MultiProcessRuntime(app, start_server=True).dispatch()
MultiProcessRuntime(app, start_server=False).dispatch()


def test_lightning_flow():
Expand Down
3 changes: 2 additions & 1 deletion tests/tests_app/runners/backends/test_mp_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from lightning_app.runners.backends import MultiProcessingBackend


@mock.patch("lightning_app.core.app.AppStatus")
@mock.patch("lightning_app.runners.backends.mp_process.multiprocessing")
def test_backend_create_work_with_set_start_method(multiprocessing_mock):
def test_backend_create_work_with_set_start_method(multiprocessing_mock, *_):
backend = MultiProcessingBackend(entrypoint_file="fake.py")
work = Mock(spec=LightningWork)
work._start_method = "test_start_method"
Expand Down
2 changes: 1 addition & 1 deletion tests/tests_app/runners/test_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ def test_call_with_queue_server_type_specified(self, lightningapps, monkeypatch,
)

# calling with env variable set to http
monkeypatch.setattr(cloud, "CLOUD_QUEUE_TYPE", "http")
monkeypatch.setitem(os.environ, "LIGHTNING_CLOUD_QUEUE_TYPE", "http")
cloud_runtime.backend.client.reset_mock()
cloud_runtime.dispatch()
body = IdGetBody(
Expand Down
6 changes: 6 additions & 0 deletions tests/tests_app/utilities/packaging/test_cloud_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ def test_cloud_compute_shared_memory():
cloud_compute = CloudCompute("gpu", shm_size=1100)
assert cloud_compute.shm_size == 1100

cloud_compute = CloudCompute("gpu")
assert cloud_compute.shm_size == 1024

cloud_compute = CloudCompute("cpu")
assert cloud_compute.shm_size == 0


def test_cloud_compute_with_mounts():
mount_1 = Mount(source="s3://foo/", mount_path="/foo")
Expand Down
2 changes: 1 addition & 1 deletion tests/tests_app/utilities/test_load_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def test_extract_metadata_from_component():
"name": "gpu",
"disk_size": 0,
"idle_timeout": None,
"shm_size": 0,
"shm_size": 1024,
"mounts": None,
"_internal_id": ANY,
},
Expand Down