From 9a32f28652908c3cd46a41f18b629e57b02b5dc9 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 10:49:35 +0000 Subject: [PATCH 01/21] Remove SingleProcessRuntime --- src/lightning_app/runners/__init__.py | 2 - src/lightning_app/runners/runtime_type.py | 6 +- src/lightning_app/runners/singleprocess.py | 62 ------------------- tests/tests_app/core/test_lightning_api.py | 35 +---------- tests/tests_app/core/test_lightning_app.py | 55 +--------------- tests/tests_app/core/test_lightning_flow.py | 8 +-- tests/tests_app/runners/test_singleprocess.py | 35 ----------- tests/tests_app/structures/test_structures.py | 4 +- 8 files changed, 12 insertions(+), 195 deletions(-) delete mode 100644 src/lightning_app/runners/singleprocess.py delete mode 100644 tests/tests_app/runners/test_singleprocess.py diff --git a/src/lightning_app/runners/__init__.py b/src/lightning_app/runners/__init__.py index e2300663c4930..7749cbbae561e 100644 --- a/src/lightning_app/runners/__init__.py +++ b/src/lightning_app/runners/__init__.py @@ -1,7 +1,6 @@ from lightning_app.runners.cloud import CloudRuntime from lightning_app.runners.multiprocess import MultiProcessRuntime from lightning_app.runners.runtime import dispatch, Runtime -from lightning_app.runners.singleprocess import SingleProcessRuntime from lightning_app.utilities.app_commands import run_app_commands from lightning_app.utilities.load_app import load_app_from_file @@ -11,6 +10,5 @@ "run_app_commands", "Runtime", "MultiProcessRuntime", - "SingleProcessRuntime", "CloudRuntime", ] diff --git a/src/lightning_app/runners/runtime_type.py b/src/lightning_app/runners/runtime_type.py index aca045625f5e9..979cf5c540751 100644 --- a/src/lightning_app/runners/runtime_type.py +++ b/src/lightning_app/runners/runtime_type.py @@ -1,7 +1,7 @@ from enum import Enum from typing import Type, TYPE_CHECKING -from lightning_app.runners import CloudRuntime, MultiProcessRuntime, SingleProcessRuntime +from lightning_app.runners import CloudRuntime, MultiProcessRuntime if TYPE_CHECKING: from lightning_app.runners.runtime import Runtime @@ -13,9 +13,7 @@ class RuntimeType(Enum): CLOUD = "cloud" def get_runtime(self) -> Type["Runtime"]: - if self == RuntimeType.SINGLEPROCESS: - return SingleProcessRuntime - elif self == RuntimeType.MULTIPROCESS: + if self == RuntimeType.MULTIPROCESS: return MultiProcessRuntime elif self == RuntimeType.CLOUD: return CloudRuntime diff --git a/src/lightning_app/runners/singleprocess.py b/src/lightning_app/runners/singleprocess.py deleted file mode 100644 index 61a67ce9ba904..0000000000000 --- a/src/lightning_app/runners/singleprocess.py +++ /dev/null @@ -1,62 +0,0 @@ -import multiprocessing as mp -import os -from typing import Any - -import click - -from lightning_app.core.api import start_server -from lightning_app.core.queues import QueuingSystem -from lightning_app.runners.runtime import Runtime -from lightning_app.utilities.app_helpers import _is_headless -from lightning_app.utilities.load_app import extract_metadata_from_app - - -class SingleProcessRuntime(Runtime): - """Runtime to launch the LightningApp into a single process.""" - - def __post_init__(self): - pass - - def dispatch(self, *args, open_ui: bool = True, **kwargs: Any): - """Method to dispatch and run the LightningApp.""" - queue = QueuingSystem.SINGLEPROCESS - - self.app.delta_queue = queue.get_delta_queue() - self.app.state_update_queue = queue.get_caller_queue(work_name="single_worker") - self.app.error_queue = queue.get_error_queue() - - if self.start_server: - self.app.should_publish_changes_to_api = True - self.app.api_publish_state_queue = QueuingSystem.MULTIPROCESS.get_api_state_publish_queue() - self.app.api_delta_queue = QueuingSystem.MULTIPROCESS.get_api_delta_queue() - has_started_queue = QueuingSystem.MULTIPROCESS.get_has_server_started_queue() - kwargs = dict( - host=self.host, - port=self.port, - api_publish_state_queue=self.app.api_publish_state_queue, - api_delta_queue=self.app.api_delta_queue, - has_started_queue=has_started_queue, - spec=extract_metadata_from_app(self.app), - root_path=self.app.root_path, - ) - server_proc = mp.Process(target=start_server, kwargs=kwargs) - self.processes["server"] = server_proc - server_proc.start() - - # wait for server to be ready. - has_started_queue.get() - - if open_ui and not _is_headless(self.app): - click.launch(self._get_app_url()) - - try: - self.app._run() - except KeyboardInterrupt: - self.terminate() - raise - finally: - self.terminate() - - @staticmethod - def _get_app_url() -> str: - return os.getenv("APP_SERVER_HOST", "http://127.0.0.1:7501/view") diff --git a/tests/tests_app/core/test_lightning_api.py b/tests/tests_app/core/test_lightning_api.py index 82b58cc36fac3..e7dd48465872b 100644 --- a/tests/tests_app/core/test_lightning_api.py +++ b/tests/tests_app/core/test_lightning_api.py @@ -28,7 +28,7 @@ UIRefresher, ) from lightning_app.core.constants import APP_SERVER_PORT -from lightning_app.runners import MultiProcessRuntime, SingleProcessRuntime +from lightning_app.runners import MultiProcessRuntime from lightning_app.storage.drive import Drive from lightning_app.testing.helpers import _MockQueue from lightning_app.utilities.component import _set_frontend_context, _set_work_context @@ -71,7 +71,6 @@ def run(self): self.work_a.run() -# TODO: Resolve singleprocess - idea: explore frame calls recursively. @pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime]) def test_app_state_api(runtime_cls): """This test validates the AppState can properly broadcast changes from work within its own process.""" @@ -85,36 +84,6 @@ def test_app_state_api(runtime_cls): os.remove("test_app_state_api.txt") -class A2(LightningFlow): - def __init__(self): - super().__init__() - self.var_a = 0 - self.a = _A() - - def update_state(self): - state = AppState() - # this would download and push data to the REST API. - assert state.a.work_a.var_a == 0 - assert state.var_a == 0 - state.var_a = -1 - - def run(self): - if self.var_a == 0: - self.update_state() - elif self.var_a == -1: - self._exit() - - -# TODO: Find why this test is flaky. -@pytest.mark.skip(reason="flaky test.") -@pytest.mark.parametrize("runtime_cls", [SingleProcessRuntime]) -def test_app_state_api_with_flows(runtime_cls, tmpdir): - """This test validates the AppState can properly broadcast changes from flows.""" - app = LightningApp(A2(), log_level="debug") - runtime_cls(app, start_server=True).dispatch() - assert app.root.var_a == -1 - - class NestedFlow(LightningFlow): def run(self): pass @@ -181,7 +150,7 @@ def maybe_apply_changes(self): # FIXME: This test doesn't assert anything @pytest.mark.skip(reason="TODO: Resolve flaky test.") -@pytest.mark.parametrize("runtime_cls", [SingleProcessRuntime, MultiProcessRuntime]) +@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime]) def test_app_stage_from_frontend(runtime_cls): """This test validates that delta from the `api_delta_queue` manipulating the ['app_state']['stage'] would start and stop the app.""" diff --git a/tests/tests_app/core/test_lightning_app.py b/tests/tests_app/core/test_lightning_app.py index e5c265e2efde9..678422aa40ab8 100644 --- a/tests/tests_app/core/test_lightning_app.py +++ b/tests/tests_app/core/test_lightning_app.py @@ -4,7 +4,6 @@ from re import escape from time import sleep from unittest import mock -from unittest.mock import ANY import pytest from deepdiff import Delta @@ -21,7 +20,7 @@ ) from lightning_app.core.queues import BaseQueue, MultiProcessQueue, RedisQueue, SingleProcessQueue from lightning_app.frontend import StreamlitFrontend -from lightning_app.runners import MultiProcessRuntime, SingleProcessRuntime +from lightning_app.runners import MultiProcessRuntime from lightning_app.storage import Path from lightning_app.storage.path import _storage_root_dir from lightning_app.testing.helpers import _RunIf @@ -89,56 +88,6 @@ def run(self): self.has_finished = True -class SimpleFlow(LightningFlow): - def __init__(self): - super().__init__() - self.work_a = Work(cache_calls=True) - self.work_b = Work(cache_calls=False) - - def run(self): - self.work_a.run() - self.work_b.run() - if self.work_a.has_finished and self.work_b.has_finished: - self._exit() - - -@pytest.mark.skip -@pytest.mark.parametrize("component_cls", [SimpleFlow]) -@pytest.mark.parametrize("runtime_cls", [SingleProcessRuntime]) -def test_simple_app(component_cls, runtime_cls, tmpdir): - comp = component_cls() - app = LightningApp(comp, log_level="debug") - assert app.root == comp - expected = { - "app_state": ANY, - "vars": {"_layout": ANY, "_paths": {}}, - "calls": {}, - "flows": {}, - "works": { - "work_b": { - "vars": {"has_finished": False, "counter": 0, "_urls": {}, "_paths": {}}, - "calls": {}, - "changes": {}, - }, - "work_a": { - "vars": {"has_finished": False, "counter": 0, "_urls": {}, "_paths": {}}, - "calls": {}, - "changes": {}, - }, - }, - "changes": {}, - } - assert app.state == expected - runtime_cls(app, start_server=False).dispatch() - - assert comp.work_a.has_finished - assert comp.work_b.has_finished - # possible the `work_a` takes for ever to - # start and `work_b` has already completed multiple iterations. - assert comp.work_a.counter == 1 - assert comp.work_b.counter >= 3 - - class WorkCounter(LightningWork): def __init__(self): super().__init__() @@ -357,7 +306,7 @@ def _apply_restarting(self): return True -@pytest.mark.parametrize("runtime_cls", [SingleProcessRuntime, MultiProcessRuntime]) +@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime]) def test_app_restarting_move_to_blocking(runtime_cls, tmpdir): """Validates sending restarting move the app to blocking again.""" app = SimpleApp2(CounterFlow(), log_level="debug") diff --git a/tests/tests_app/core/test_lightning_flow.py b/tests/tests_app/core/test_lightning_flow.py index ed668c12e9b1b..05af00039170a 100644 --- a/tests/tests_app/core/test_lightning_flow.py +++ b/tests/tests_app/core/test_lightning_flow.py @@ -13,7 +13,7 @@ from lightning_app import LightningApp from lightning_app.core.flow import LightningFlow from lightning_app.core.work import LightningWork -from lightning_app.runners import MultiProcessRuntime, SingleProcessRuntime +from lightning_app.runners import MultiProcessRuntime from lightning_app.storage import Path from lightning_app.storage.path import _storage_root_dir from lightning_app.structures import Dict as LDict @@ -237,7 +237,7 @@ def run(self): flow = StateTransformationTest() assert flow.x == attribute app = LightningApp(flow) - SingleProcessRuntime(app, start_server=False).dispatch() + MultiProcessRuntime(app, start_server=False).dispatch() return app.state["vars"]["x"] @@ -519,7 +519,7 @@ def run(self): self._exit() -@pytest.mark.parametrize("runtime_cls", [SingleProcessRuntime]) +@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime]) @pytest.mark.parametrize("run_once", [False, True]) def test_lightning_flow_iterate(tmpdir, runtime_cls, run_once): app = LightningApp(CFlow(run_once)) @@ -555,7 +555,7 @@ def run(self): self.counter += 1 -@pytest.mark.parametrize("runtime_cls", [SingleProcessRuntime, MultiProcessRuntime]) +@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime]) def test_lightning_flow_counter(runtime_cls, tmpdir): app = LightningApp(FlowCounter()) diff --git a/tests/tests_app/runners/test_singleprocess.py b/tests/tests_app/runners/test_singleprocess.py deleted file mode 100644 index 998f23e66296f..0000000000000 --- a/tests/tests_app/runners/test_singleprocess.py +++ /dev/null @@ -1,35 +0,0 @@ -import os -from unittest import mock - -import pytest - -from lightning_app import LightningFlow -from lightning_app.core.app import LightningApp -from lightning_app.runners import SingleProcessRuntime - - -class Flow(LightningFlow): - def run(self): - raise KeyboardInterrupt - - -def on_before_run(): - pass - - -def test_single_process_runtime(tmpdir): - - app = LightningApp(Flow()) - SingleProcessRuntime(app, start_server=False).dispatch(on_before_run=on_before_run) - - -@pytest.mark.parametrize( - "env,expected_url", - [ - ({}, "http://127.0.0.1:7501/view"), - ({"APP_SERVER_HOST": "http://test"}, "http://test"), - ], -) -def test_get_app_url(env, expected_url): - with mock.patch.dict(os.environ, env): - assert SingleProcessRuntime._get_app_url() == expected_url diff --git a/tests/tests_app/structures/test_structures.py b/tests/tests_app/structures/test_structures.py index 05905c3421bec..cd38689528484 100644 --- a/tests/tests_app/structures/test_structures.py +++ b/tests/tests_app/structures/test_structures.py @@ -4,7 +4,7 @@ import pytest from lightning_app import LightningApp, LightningFlow, LightningWork -from lightning_app.runners import MultiProcessRuntime, SingleProcessRuntime +from lightning_app.runners import MultiProcessRuntime from lightning_app.storage.payload import Payload from lightning_app.structures import Dict, List from lightning_app.testing.helpers import EmptyFlow @@ -309,7 +309,7 @@ def run(self): @pytest.mark.skip(reason="tchaton: Resolve this test.") -@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime, SingleProcessRuntime]) +@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime]) @pytest.mark.parametrize("run_once_iterable", [False, True]) @pytest.mark.parametrize("cache_calls", [False, True]) @pytest.mark.parametrize("use_list", [False, True]) From 7baff02c933d0b4da5bd6d60a2334af0bf2d7ad9 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 11:01:22 +0000 Subject: [PATCH 02/21] Remove unused queues --- docs/source-app/testing.rst | 3 --- pyproject.toml | 1 - src/lightning_app/core/app.py | 4 +--- src/lightning_app/core/queues.py | 20 +------------------- src/lightning_app/runners/runtime_type.py | 1 - src/lightning_app/utilities/app_helpers.py | 7 ------- tests/tests_app/core/test_lightning_app.py | 5 ++--- tests/tests_app/core/test_queues.py | 2 +- tests/tests_app/runners/test_runtime.py | 1 - 9 files changed, 5 insertions(+), 39 deletions(-) diff --git a/docs/source-app/testing.rst b/docs/source-app/testing.rst index 6d0fe71832a7e..da52727cbde0d 100644 --- a/docs/source-app/testing.rst +++ b/docs/source-app/testing.rst @@ -120,7 +120,6 @@ We provide ``application_testing`` as a helper funtion to get your application u os.path.join(_PROJECT_ROOT, "examples/app_v0/app.py"), "--blocking", "False", - "--multiprocess", "--open-ui", "False", ] @@ -129,9 +128,7 @@ First in the list for ``command_line`` is the location of your script. It is an Next there are a couple of options you can leverage: - * ``blocking`` - Blocking is an app status that says "Do not run until I click run in the UI". For our integration test, since we are not using the UI, we are setting this to "False". -* ``multiprocess/singleprocess`` - This is the runtime your app is expected to run under. * ``open-ui`` - We set this to false since this is the routine that opens a browser for your local execution. Once you have your commandline ready, you will then be able to kick off the test and gather results: diff --git a/pyproject.toml b/pyproject.toml index 6587db0a2c80e..87dfb33b1e786 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,7 +101,6 @@ module = [ "lightning_app.runners.cloud", "lightning_app.runners.multiprocess", "lightning_app.runners.runtime", - "lightning_app.runners.singleprocess", "lightning_app.source_code.copytree", "lightning_app.source_code.hashing", "lightning_app.source_code.local", diff --git a/src/lightning_app/core/app.py b/src/lightning_app/core/app.py index 42cf0f241b47e..bcf3c2789098e 100644 --- a/src/lightning_app/core/app.py +++ b/src/lightning_app/core/app.py @@ -21,7 +21,7 @@ FRONTEND_DIR, STATE_ACCUMULATE_WAIT, ) -from lightning_app.core.queues import BaseQueue, SingleProcessQueue +from lightning_app.core.queues import BaseQueue from lightning_app.core.work import LightningWork from lightning_app.frontend import Frontend from lightning_app.storage import Drive, Path, Payload @@ -549,8 +549,6 @@ def _collect_work_finish_status(self) -> dict: def _should_snapshot(self) -> bool: if len(self.works) == 0: return True - elif isinstance(self.delta_queue, SingleProcessQueue): - return True elif self._has_updated: work_finished_status = self._collect_work_finish_status() if work_finished_status: diff --git a/src/lightning_app/core/queues.py b/src/lightning_app/core/queues.py index f38942915abc3..db150a57eb098 100644 --- a/src/lightning_app/core/queues.py +++ b/src/lightning_app/core/queues.py @@ -49,7 +49,6 @@ class QueuingSystem(Enum): - SINGLEPROCESS = "singleprocess" MULTIPROCESS = "multiprocess" REDIS = "redis" HTTP = "http" @@ -59,10 +58,8 @@ def get_queue(self, queue_name: str) -> "BaseQueue": return MultiProcessQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT) elif self == QueuingSystem.REDIS: return RedisQueue(queue_name, default_timeout=REDIS_QUEUES_READ_DEFAULT_TIMEOUT) - elif self == QueuingSystem.HTTP: - return HTTPQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT) else: - return SingleProcessQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT) + return HTTPQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT) def get_api_response_queue(self, queue_id: Optional[str] = None) -> "BaseQueue": queue_name = f"{queue_id}_{API_RESPONSE_QUEUE_CONSTANT}" if queue_id else API_RESPONSE_QUEUE_CONSTANT @@ -179,21 +176,6 @@ def is_running(self) -> bool: return True -class SingleProcessQueue(BaseQueue): - def __init__(self, name: str, default_timeout: float): - self.name = name - self.default_timeout = default_timeout - self.queue = queue.Queue() - - def put(self, item): - self.queue.put(item) - - def get(self, timeout: int = None): - if timeout == 0: - timeout = self.default_timeout - return self.queue.get(timeout=timeout, block=(timeout is None)) - - class MultiProcessQueue(BaseQueue): def __init__(self, name: str, default_timeout: float): self.name = name diff --git a/src/lightning_app/runners/runtime_type.py b/src/lightning_app/runners/runtime_type.py index 979cf5c540751..c5a9b60f89072 100644 --- a/src/lightning_app/runners/runtime_type.py +++ b/src/lightning_app/runners/runtime_type.py @@ -8,7 +8,6 @@ class RuntimeType(Enum): - SINGLEPROCESS = "singleprocess" MULTIPROCESS = "multiprocess" CLOUD = "cloud" diff --git a/src/lightning_app/utilities/app_helpers.py b/src/lightning_app/utilities/app_helpers.py index a000af3e71fe6..f07ae6bc88c1c 100644 --- a/src/lightning_app/utilities/app_helpers.py +++ b/src/lightning_app/utilities/app_helpers.py @@ -130,13 +130,6 @@ def set_served_session_id(self, k, v): self.store[k].session_id = v -class DistributedMode(enum.Enum): - SINGLEPROCESS = enum.auto() - MULTIPROCESS = enum.auto() - CONTAINER = enum.auto() - GRID = enum.auto() - - class _LightningAppRef: _app_instance: Optional["LightningApp"] = None diff --git a/tests/tests_app/core/test_lightning_app.py b/tests/tests_app/core/test_lightning_app.py index 678422aa40ab8..0027e60aa1bfd 100644 --- a/tests/tests_app/core/test_lightning_app.py +++ b/tests/tests_app/core/test_lightning_app.py @@ -18,7 +18,7 @@ REDIS_QUEUES_READ_DEFAULT_TIMEOUT, STATE_UPDATE_TIMEOUT, ) -from lightning_app.core.queues import BaseQueue, MultiProcessQueue, RedisQueue, SingleProcessQueue +from lightning_app.core.queues import BaseQueue, MultiProcessQueue, RedisQueue from lightning_app.frontend import StreamlitFrontend from lightning_app.runners import MultiProcessRuntime from lightning_app.storage import Path @@ -360,7 +360,6 @@ def run(self): @pytest.mark.parametrize( "queue_type_cls, default_timeout", [ - (SingleProcessQueue, STATE_UPDATE_TIMEOUT), (MultiProcessQueue, STATE_UPDATE_TIMEOUT), pytest.param( RedisQueue, @@ -426,7 +425,7 @@ def test_maybe_apply_changes_from_flow(): """This test validates the app `_updated` is set to True only if the state was changed in the flow.""" app = LightningApp(SimpleFlow()) - app.delta_queue = SingleProcessQueue("a", 0) + app.delta_queue = MultiProcessQueue("a", 0) assert app._has_updated app.maybe_apply_changes() app.root.run() diff --git a/tests/tests_app/core/test_queues.py b/tests/tests_app/core/test_queues.py index 0f930bcacabc7..9628e2414d5cf 100644 --- a/tests/tests_app/core/test_queues.py +++ b/tests/tests_app/core/test_queues.py @@ -16,7 +16,7 @@ @pytest.mark.skipif(not check_if_redis_running(), reason="Redis is not running") -@pytest.mark.parametrize("queue_type", [QueuingSystem.REDIS, QueuingSystem.MULTIPROCESS, QueuingSystem.SINGLEPROCESS]) +@pytest.mark.parametrize("queue_type", [QueuingSystem.REDIS, QueuingSystem.MULTIPROCESS]) def test_queue_api(queue_type, monkeypatch): """Test the Queue API. diff --git a/tests/tests_app/runners/test_runtime.py b/tests/tests_app/runners/test_runtime.py index c79ef1207cae9..cf0e1feea34ae 100644 --- a/tests/tests_app/runners/test_runtime.py +++ b/tests/tests_app/runners/test_runtime.py @@ -13,7 +13,6 @@ @pytest.mark.parametrize( "runtime_type", [ - RuntimeType.SINGLEPROCESS, RuntimeType.MULTIPROCESS, RuntimeType.CLOUD, ], From bc65bd3ddd7aa6d8096042516212000269388985 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 11:07:55 +0000 Subject: [PATCH 03/21] Docs --- docs/source-app/api_references.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/source-app/api_references.rst b/docs/source-app/api_references.rst index 30e0ade3a25ad..8b43696af7a7e 100644 --- a/docs/source-app/api_references.rst +++ b/docs/source-app/api_references.rst @@ -87,5 +87,4 @@ _______ :template: classtemplate_no_index.rst ~cloud.CloudRuntime - ~singleprocess.SingleProcessRuntime ~multiprocess.MultiProcessRuntime From cb5ac9e004dbbccda19659d7cbdc8630840a9e13 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 11:09:22 +0000 Subject: [PATCH 04/21] Update CHANGELOG.md --- src/lightning_app/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lightning_app/CHANGELOG.md b/src/lightning_app/CHANGELOG.md index f7cd8be3db982..41f4581c52e03 100644 --- a/src/lightning_app/CHANGELOG.md +++ b/src/lightning_app/CHANGELOG.md @@ -40,7 +40,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). ### Removed -- +- Removed the `SingleProcessRuntime` ([#15933](https://github.com/Lightning-AI/lightning/pull/15933)) ### Fixed From 399edd8b667c204299b9b5eb898885b0e6b6294f Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 11:22:44 +0000 Subject: [PATCH 05/21] Docs --- docs/source-app/api_reference/runners.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/source-app/api_reference/runners.rst b/docs/source-app/api_reference/runners.rst index 3040d3adde36c..1036df1731eb8 100644 --- a/docs/source-app/api_reference/runners.rst +++ b/docs/source-app/api_reference/runners.rst @@ -18,5 +18,4 @@ ______________ :template: classtemplate.rst ~cloud.CloudRuntime - ~singleprocess.SingleProcessRuntime ~multiprocess.MultiProcessRuntime From b6189e251a7a63b99c373c29c0cbde8883934c27 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 13:20:48 +0000 Subject: [PATCH 06/21] Remove parametrization --- tests/tests_app/core/test_lightning_api.py | 10 ++++------ tests/tests_app/core/test_lightning_flow.py | 14 ++++++-------- tests/tests_app/structures/test_structures.py | 7 +++---- 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/tests/tests_app/core/test_lightning_api.py b/tests/tests_app/core/test_lightning_api.py index e7dd48465872b..6880af4a96401 100644 --- a/tests/tests_app/core/test_lightning_api.py +++ b/tests/tests_app/core/test_lightning_api.py @@ -71,11 +71,10 @@ def run(self): self.work_a.run() -@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime]) -def test_app_state_api(runtime_cls): +def test_app_state_api(): """This test validates the AppState can properly broadcast changes from work within its own process.""" app = LightningApp(_A(), log_level="debug") - runtime_cls(app, start_server=True).dispatch() + MultiProcessRuntime(app, start_server=True).dispatch() assert app.root.work_a.var_a == -1 _set_work_context() assert app.root.work_a.drive.list(".") == ["test_app_state_api.txt"] @@ -150,13 +149,12 @@ def maybe_apply_changes(self): # FIXME: This test doesn't assert anything @pytest.mark.skip(reason="TODO: Resolve flaky test.") -@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime]) -def test_app_stage_from_frontend(runtime_cls): +def test_app_stage_from_frontend(): """This test validates that delta from the `api_delta_queue` manipulating the ['app_state']['stage'] would start and stop the app.""" app = AppStageTestingApp(FlowA(), log_level="debug") app.stage = AppStage.BLOCKING - runtime_cls(app, start_server=True).dispatch() + MultiProcessRuntime(app, start_server=True).dispatch() def test_update_publish_state_and_maybe_refresh_ui(): diff --git a/tests/tests_app/core/test_lightning_flow.py b/tests/tests_app/core/test_lightning_flow.py index 05af00039170a..dacccfb3873aa 100644 --- a/tests/tests_app/core/test_lightning_flow.py +++ b/tests/tests_app/core/test_lightning_flow.py @@ -519,11 +519,10 @@ def run(self): self._exit() -@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime]) @pytest.mark.parametrize("run_once", [False, True]) -def test_lightning_flow_iterate(tmpdir, runtime_cls, run_once): +def test_lightning_flow_iterate(tmpdir, run_once): app = LightningApp(CFlow(run_once)) - runtime_cls(app, start_server=False).dispatch() + MultiProcessRuntime(app, start_server=False).dispatch() assert app.root.looping == 0 assert app.root.tracker == 4 call_hash = list(v for v in app.root._calls if "experimental_iterate" in v)[0] @@ -537,7 +536,7 @@ def test_lightning_flow_iterate(tmpdir, runtime_cls, run_once): app.root.restarting = True assert app.root.looping == 0 assert app.root.tracker == 4 - runtime_cls(app, start_server=False).dispatch() + MultiProcessRuntime(app, start_server=False).dispatch() assert app.root.looping == 2 assert app.root.tracker == 10 if run_once else 20 iterate_call = app.root._calls[call_hash] @@ -555,12 +554,11 @@ def run(self): self.counter += 1 -@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime]) -def test_lightning_flow_counter(runtime_cls, tmpdir): +def test_lightning_flow_counter(tmpdir): app = LightningApp(FlowCounter()) app.checkpointing = True - runtime_cls(app, start_server=False).dispatch() + MultiProcessRuntime(app, start_server=False).dispatch() assert app.root.counter == 3 checkpoint_dir = os.path.join(_storage_root_dir(), "checkpoints") @@ -571,7 +569,7 @@ def test_lightning_flow_counter(runtime_cls, tmpdir): with open(checkpoint_path, "rb") as f: app = LightningApp(FlowCounter()) app.set_state(pickle.load(f)) - runtime_cls(app, start_server=False).dispatch() + MultiProcessRuntime(app, start_server=False).dispatch() assert app.root.counter == 3 diff --git a/tests/tests_app/structures/test_structures.py b/tests/tests_app/structures/test_structures.py index cd38689528484..b0913a219f73d 100644 --- a/tests/tests_app/structures/test_structures.py +++ b/tests/tests_app/structures/test_structures.py @@ -309,11 +309,10 @@ def run(self): @pytest.mark.skip(reason="tchaton: Resolve this test.") -@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime]) @pytest.mark.parametrize("run_once_iterable", [False, True]) @pytest.mark.parametrize("cache_calls", [False, True]) @pytest.mark.parametrize("use_list", [False, True]) -def test_structure_with_iterate_and_fault_tolerance(runtime_cls, run_once_iterable, cache_calls, use_list): +def test_structure_with_iterate_and_fault_tolerance(run_once_iterable, cache_calls, use_list): class DummyFlow(LightningFlow): def __init__(self): super().__init__() @@ -360,7 +359,7 @@ def run(self): self.looping += 1 app = LightningApp(RootFlow(use_list, run_once_iterable, cache_calls)) - runtime_cls(app, start_server=False).dispatch() + MultiProcessRuntime(app, start_server=False).dispatch() assert app.root.iter[0 if use_list else "0"].counter == 1 assert app.root.iter[1 if use_list else "1"].counter == 0 assert app.root.iter[2 if use_list else "2"].counter == 0 @@ -368,7 +367,7 @@ def run(self): app = LightningApp(RootFlow(use_list, run_once_iterable, cache_calls)) app.root.restarting = True - runtime_cls(app, start_server=False).dispatch() + MultiProcessRuntime(app, start_server=False).dispatch() if run_once_iterable: expected_value = 1 From e30a1a4f3ea686dddf9fe5a7361be693a4cc6c02 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 14:32:57 +0000 Subject: [PATCH 07/21] Add back tests --- src/lightning_app/utilities/state.py | 28 +++++--- tests/tests_app/core/test_lightning_api.py | 27 ++++++++ tests/tests_app/core/test_lightning_app.py | 76 +++++++++++++++++++++- 3 files changed, 119 insertions(+), 12 deletions(-) diff --git a/src/lightning_app/utilities/state.py b/src/lightning_app/utilities/state.py index a882953ab0450..51f1cdd2b8794 100644 --- a/src/lightning_app/utilities/state.py +++ b/src/lightning_app/utilities/state.py @@ -2,6 +2,7 @@ import json import os from copy import deepcopy +from time import sleep from typing import Any, Dict, List, Optional, Tuple, Union from deepdiff import DeepDiff @@ -149,16 +150,25 @@ def _request_state(self) -> None: return app_url = f"{self._url}/api/v1/state" headers = headers_for(self._plugin.get_context()) if self._plugin else {} - try: - response = self._session.get(app_url, headers=headers, timeout=1) - except ConnectionError as e: - raise AttributeError("Failed to connect and fetch the app state. Is the app running?") from e - self._authorized = response.status_code - if self._authorized != 200: - return - logger.debug(f"GET STATE {response} {response.json()}") - self._store_state(response.json()) + response_json = {} + + # Sometimes the state URL can return an empty JSON when things are being set-up, + # so we wait for it to be ready here. + while response_json == {}: + try: + response = self._session.get(app_url, headers=headers, timeout=1) + except ConnectionError as e: + raise AttributeError("Failed to connect and fetch the app state. Is the app running?") from e + + self._authorized = response.status_code + if self._authorized == 200: + response_json = response.json() + + sleep(0.5) + + logger.debug(f"GET STATE {response} {response_json}") + self._store_state(response_json) def __getattr__(self, name: str) -> Union[Any, "AppState"]: if name in self._APP_PRIVATE_KEYS: diff --git a/tests/tests_app/core/test_lightning_api.py b/tests/tests_app/core/test_lightning_api.py index 6880af4a96401..04b89c927941a 100644 --- a/tests/tests_app/core/test_lightning_api.py +++ b/tests/tests_app/core/test_lightning_api.py @@ -83,6 +83,33 @@ def test_app_state_api(): os.remove("test_app_state_api.txt") +class A2(LightningFlow): + def __init__(self): + super().__init__() + self.var_a = 0 + self.a = _A() + + def update_state(self): + state = AppState() + # this would download and push data to the REST API. + assert state.a.work_a.var_a == 0 + assert state.var_a == 0 + state.var_a = -1 + + def run(self): + if self.var_a == 0: + self.update_state() + elif self.var_a == -1: + self._exit() + + +def test_app_state_api_with_flows(tmpdir): + """This test validates the AppState can properly broadcast changes from flows.""" + app = LightningApp(A2(), log_level="debug") + MultiProcessRuntime(app, start_server=True).dispatch() + assert app.root.var_a == -1 + + class NestedFlow(LightningFlow): def run(self): pass diff --git a/tests/tests_app/core/test_lightning_app.py b/tests/tests_app/core/test_lightning_app.py index 0027e60aa1bfd..6a89ef5e5d519 100644 --- a/tests/tests_app/core/test_lightning_app.py +++ b/tests/tests_app/core/test_lightning_app.py @@ -81,13 +81,83 @@ def __init__(self, cache_calls: bool = True): self.has_finished = False def run(self): - self.counter += 1 + self.counter = self.counter + 1 if self.cache_calls: self.has_finished = True elif self.counter >= 3: self.has_finished = True +class SimpleFlow(LightningFlow): + def __init__(self): + super().__init__() + self.work_a = Work(cache_calls=True) + self.work_b = Work(cache_calls=False) + + def run(self): + if self.work_a.has_finished and self.work_b.has_finished: + self._exit() + self.work_a.run() + self.work_b.run() + + +def test_simple_app(tmpdir): + comp = SimpleFlow() + app = LightningApp(comp, log_level="debug") + assert app.root == comp + expected = { + "app_state": mock.ANY, + "vars": {"_layout": mock.ANY, "_paths": {}}, + "calls": {}, + "flows": {}, + "structures": {}, + "works": { + "work_b": { + "vars": { + "has_finished": False, + "counter": 0, + "_cloud_compute": mock.ANY, + "_host": mock.ANY, + "_url": "", + "_future_url": "", + "_internal_ip": "", + "_paths": {}, + "_port": None, + "_restarting": False, + }, + "calls": {"latest_call_hash": None}, + "changes": {}, + }, + "work_a": { + "vars": { + "has_finished": False, + "counter": 0, + "_cloud_compute": mock.ANY, + "_host": mock.ANY, + "_url": "", + "_future_url": "", + "_internal_ip": "", + "_paths": {}, + "_port": None, + "_restarting": False, + }, + "calls": {"latest_call_hash": None}, + "changes": {}, + }, + }, + "changes": {}, + } + assert app.state == expected + MultiProcessRuntime(app, start_server=False).dispatch() + + assert comp.work_a.has_finished + assert comp.work_b.has_finished + # possible the `work_a` takes for ever to + # start and `work_b` has already completed multiple iterations. + assert comp.work_a.counter == 1 + assert comp.work_b.counter >= 3 + + class WorkCounter(LightningWork): def __init__(self): super().__init__() @@ -411,7 +481,7 @@ def make_delta(i): assert generated > expect -class SimpleFlow(LightningFlow): +class SimpleFlow2(LightningFlow): def __init__(self): super().__init__() self.counter = 0 @@ -424,7 +494,7 @@ def run(self): def test_maybe_apply_changes_from_flow(): """This test validates the app `_updated` is set to True only if the state was changed in the flow.""" - app = LightningApp(SimpleFlow()) + app = LightningApp(SimpleFlow2()) app.delta_queue = MultiProcessQueue("a", 0) assert app._has_updated app.maybe_apply_changes() From 9cafa6f4508c397ba1f7bbad7ec8cdfb2610073b Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 14:34:40 +0000 Subject: [PATCH 08/21] Remove parametrization --- tests/tests_app/core/test_lightning_app.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/tests_app/core/test_lightning_app.py b/tests/tests_app/core/test_lightning_app.py index 6a89ef5e5d519..ea552adad7972 100644 --- a/tests/tests_app/core/test_lightning_app.py +++ b/tests/tests_app/core/test_lightning_app.py @@ -376,11 +376,10 @@ def _apply_restarting(self): return True -@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime]) -def test_app_restarting_move_to_blocking(runtime_cls, tmpdir): +def test_app_restarting_move_to_blocking(tmpdir): """Validates sending restarting move the app to blocking again.""" app = SimpleApp2(CounterFlow(), log_level="debug") - runtime_cls(app, start_server=False).dispatch() + MultiProcessRuntime(app, start_server=False).dispatch() class FlowWithFrontend(LightningFlow): From d7a5c13ff6b0d8106b3002ddee27ff51ee2bf7c5 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 15:12:32 +0000 Subject: [PATCH 09/21] Fix --- src/lightning_app/utilities/state.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/lightning_app/utilities/state.py b/src/lightning_app/utilities/state.py index 51f1cdd2b8794..775fa49ddd0ba 100644 --- a/src/lightning_app/utilities/state.py +++ b/src/lightning_app/utilities/state.py @@ -156,16 +156,17 @@ def _request_state(self) -> None: # Sometimes the state URL can return an empty JSON when things are being set-up, # so we wait for it to be ready here. while response_json == {}: + sleep(0.5) try: response = self._session.get(app_url, headers=headers, timeout=1) except ConnectionError as e: raise AttributeError("Failed to connect and fetch the app state. Is the app running?") from e self._authorized = response.status_code - if self._authorized == 200: - response_json = response.json() + if self._authorized != 200: + return - sleep(0.5) + response_json = response.json() logger.debug(f"GET STATE {response} {response_json}") self._store_state(response_json) From 988fbad2a027af88bba88b8113c0b2b1200f8148 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 16:18:05 +0000 Subject: [PATCH 10/21] Fix streamlit example --- examples/app_template_streamlit_ui/app.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/app_template_streamlit_ui/app.py b/examples/app_template_streamlit_ui/app.py index 6f344ac98eb8d..92f17978365df 100644 --- a/examples/app_template_streamlit_ui/app.py +++ b/examples/app_template_streamlit_ui/app.py @@ -1,8 +1,8 @@ import logging -from lightning_app import LightningApp, LightningFlow -from lightning_app.frontend import StreamlitFrontend -from lightning_app.utilities.state import AppState +from lightning.app import LightningApp, LightningFlow +from lightning.app.frontend import StreamlitFrontend +from lightning.app.utilities.state import AppState logger = logging.getLogger(__name__) From f306b8deca3a3c661df9c0956846217a08998577 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 16:57:08 +0000 Subject: [PATCH 11/21] Try to improve streamlit UI e2e test stability --- .../public/test_template_streamlit_ui.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/tests_examples_app/public/test_template_streamlit_ui.py b/tests/tests_examples_app/public/test_template_streamlit_ui.py index c78022d4c878c..033ecee5b5fcd 100644 --- a/tests/tests_examples_app/public/test_template_streamlit_ui.py +++ b/tests/tests_examples_app/public/test_template_streamlit_ui.py @@ -26,9 +26,14 @@ def click_button(*_, **__): wait_for(view_page, click_button) + # Print a few more times to make sure we get logs + for _ in range(5): + button = view_page.frame_locator("iframe").locator('button:has-text("Should print to the terminal ?")') + button.click() + has_logs = False while not has_logs: for log in fetch_logs(): - if "0: Hello World!" in log: + if "Hello World!" in log: has_logs = True sleep(1) From 5ad1f20ad2d6a03221ae3c11c264391be2b4c007 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 17:20:07 +0000 Subject: [PATCH 12/21] Try something --- examples/app_template_streamlit_ui/app.py | 2 +- src/lightning_app/utilities/app_logs.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/app_template_streamlit_ui/app.py b/examples/app_template_streamlit_ui/app.py index 92f17978365df..b6fc604222ce2 100644 --- a/examples/app_template_streamlit_ui/app.py +++ b/examples/app_template_streamlit_ui/app.py @@ -45,4 +45,4 @@ def configure_layout(self): return [{"name": "StreamLitUI", "content": self.streamlit_ui}] -app = LightningApp(HelloWorld(), log_level="debug") +app = LightningApp(HelloWorld()) diff --git a/src/lightning_app/utilities/app_logs.py b/src/lightning_app/utilities/app_logs.py index 369adc5d09f11..47b77a21ffc74 100644 --- a/src/lightning_app/utilities/app_logs.py +++ b/src/lightning_app/utilities/app_logs.py @@ -100,7 +100,7 @@ def _app_logs_reader( start_timestamps[log_event.component_name] = log_event.timestamp timestamp = start_timestamps.get(log_event.component_name, None) - if timestamp and log_event.timestamp >= timestamp: + if timestamp is None or log_event.timestamp >= timestamp: if "launcher" not in log_event.message: yield log_event From 57087956843cf3cc267cbf19716cf8b7517222a2 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 17:30:01 +0000 Subject: [PATCH 13/21] Debugging --- tests/tests_examples_app/public/test_template_streamlit_ui.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/tests_examples_app/public/test_template_streamlit_ui.py b/tests/tests_examples_app/public/test_template_streamlit_ui.py index 033ecee5b5fcd..554fcc186f1ce 100644 --- a/tests/tests_examples_app/public/test_template_streamlit_ui.py +++ b/tests/tests_examples_app/public/test_template_streamlit_ui.py @@ -26,14 +26,18 @@ def click_button(*_, **__): wait_for(view_page, click_button) + print("Button clicked") + # Print a few more times to make sure we get logs for _ in range(5): button = view_page.frame_locator("iframe").locator('button:has-text("Should print to the terminal ?")') button.click() + print("Button clicked") has_logs = False while not has_logs: for log in fetch_logs(): + print(log) if "Hello World!" in log: has_logs = True sleep(1) From 224e3f9d799deea8e9c4f513770d6f221f40c9e1 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 17:59:50 +0000 Subject: [PATCH 14/21] Try something --- src/lightning_app/utilities/app_logs.py | 2 +- .../public/test_template_streamlit_ui.py | 21 +++---------------- 2 files changed, 4 insertions(+), 19 deletions(-) diff --git a/src/lightning_app/utilities/app_logs.py b/src/lightning_app/utilities/app_logs.py index 47b77a21ffc74..369adc5d09f11 100644 --- a/src/lightning_app/utilities/app_logs.py +++ b/src/lightning_app/utilities/app_logs.py @@ -100,7 +100,7 @@ def _app_logs_reader( start_timestamps[log_event.component_name] = log_event.timestamp timestamp = start_timestamps.get(log_event.component_name, None) - if timestamp is None or log_event.timestamp >= timestamp: + if timestamp and log_event.timestamp >= timestamp: if "launcher" not in log_event.message: yield log_event diff --git a/tests/tests_examples_app/public/test_template_streamlit_ui.py b/tests/tests_examples_app/public/test_template_streamlit_ui.py index 554fcc186f1ce..74a519d719a5f 100644 --- a/tests/tests_examples_app/public/test_template_streamlit_ui.py +++ b/tests/tests_examples_app/public/test_template_streamlit_ui.py @@ -4,7 +4,7 @@ import pytest from tests_examples_app.public import _PATH_EXAMPLES -from lightning_app.testing.testing import run_app_in_cloud, wait_for +from lightning_app.testing.testing import run_app_in_cloud @pytest.mark.cloud @@ -16,23 +16,8 @@ def test_template_streamlit_ui_example_cloud() -> None: fetch_logs, _, ): - - def click_button(*_, **__): - button = view_page.frame_locator("iframe").locator('button:has-text("Should print to the terminal ?")') - - if button.all_text_contents() == ["Should print to the terminal ?"]: - button.click() - return True - - wait_for(view_page, click_button) - - print("Button clicked") - - # Print a few more times to make sure we get logs - for _ in range(5): - button = view_page.frame_locator("iframe").locator('button:has-text("Should print to the terminal ?")') - button.click() - print("Button clicked") + button = view_page.frame_locator("iframe").locator('button:has-text("Should print to the terminal ?")') + button.click() has_logs = False while not has_logs: From da0645cb2ef139a7a2da40c3aee03d8b9fae2886 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 18:32:32 +0000 Subject: [PATCH 15/21] Try something --- .../public/test_template_streamlit_ui.py | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/tests/tests_examples_app/public/test_template_streamlit_ui.py b/tests/tests_examples_app/public/test_template_streamlit_ui.py index 74a519d719a5f..8798659f4f54a 100644 --- a/tests/tests_examples_app/public/test_template_streamlit_ui.py +++ b/tests/tests_examples_app/public/test_template_streamlit_ui.py @@ -16,8 +16,25 @@ def test_template_streamlit_ui_example_cloud() -> None: fetch_logs, _, ): - button = view_page.frame_locator("iframe").locator('button:has-text("Should print to the terminal ?")') - button.click() + import playwright + + i = 0 + + while i < 2: + try: + button = view_page.frame_locator("iframe").locator('button:has-text("Should print to the terminal ?")') + button.click() + break + except (playwright._impl._api_types.Error, playwright._impl._api_types.TimeoutError) as e: + print(e) + try: + sleep(5) + view_page.reload() + except (playwright._impl._api_types.Error, playwright._impl._api_types.TimeoutError) as e: + print(e) + pass + sleep(2) + i = i + 1 has_logs = False while not has_logs: From 228cbc1e3711e0ea282ae7ad82b6df46ea9c7e97 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 18:52:08 +0000 Subject: [PATCH 16/21] Debugging --- src/lightning_app/testing/testing.py | 32 +++++++++---------- .../public/test_template_streamlit_ui.py | 4 +++ 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/src/lightning_app/testing/testing.py b/src/lightning_app/testing/testing.py index 8d112d7fa4a7a..0c44e62b9d4e0 100644 --- a/src/lightning_app/testing/testing.py +++ b/src/lightning_app/testing/testing.py @@ -385,6 +385,22 @@ def run_app_in_cloud( process = Process(target=_print_logs, kwargs={"app_id": app_id}) process.start() + # Wait until the app is running + while True: + sleep(1) + + lit_apps = [ + app + for app in client.lightningapp_instance_service_list_lightningapp_instances( + project_id=project.project_id + ).lightningapps + if app.name == name + ] + app = lit_apps[0] + + if app.status.phase == V1LightningappInstanceState.RUNNING: + break + if not app.spec.is_headless: while True: try: @@ -398,22 +414,6 @@ def run_app_in_cloud( else: view_page = None - # Wait until the app is running - while True: - sleep(1) - - lit_apps = [ - app - for app in client.lightningapp_instance_service_list_lightningapp_instances( - project_id=project.project_id - ).lightningapps - if app.name == name - ] - app = lit_apps[0] - - if app.status.phase == V1LightningappInstanceState.RUNNING: - break - # TODO: is re-creating this redundant? lit_apps = [ app diff --git a/tests/tests_examples_app/public/test_template_streamlit_ui.py b/tests/tests_examples_app/public/test_template_streamlit_ui.py index 8798659f4f54a..0eab28366ca2a 100644 --- a/tests/tests_examples_app/public/test_template_streamlit_ui.py +++ b/tests/tests_examples_app/public/test_template_streamlit_ui.py @@ -18,12 +18,16 @@ def test_template_streamlit_ui_example_cloud() -> None: ): import playwright + print("Reached") + i = 0 while i < 2: try: + print("clicking") button = view_page.frame_locator("iframe").locator('button:has-text("Should print to the terminal ?")') button.click() + print("clicked") break except (playwright._impl._api_types.Error, playwright._impl._api_types.TimeoutError) as e: print(e) From 3b5214aa86c214915d6baad494aca0a4c88f40dc Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 19:08:00 +0000 Subject: [PATCH 17/21] Debugging --- src/lightning_app/testing/testing.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/lightning_app/testing/testing.py b/src/lightning_app/testing/testing.py index 0c44e62b9d4e0..aa8668e3cbf65 100644 --- a/src/lightning_app/testing/testing.py +++ b/src/lightning_app/testing/testing.py @@ -401,18 +401,21 @@ def run_app_in_cloud( if app.status.phase == V1LightningappInstanceState.RUNNING: break + view_page = None if not app.spec.is_headless: - while True: + for _ in range(5): try: + admin_page.reload() with admin_page.context.expect_page() as page_catcher: admin_page.locator('[data-cy="open"]').click() view_page = page_catcher.value view_page.wait_for_load_state(timeout=0) break except (playwright._impl._api_types.Error, playwright._impl._api_types.TimeoutError): - pass - else: - view_page = None + sleep(1) + + if view_page is None: + raise RuntimeError("Failed to open the app UI") # TODO: is re-creating this redundant? lit_apps = [ From c624d67eda57e5319e6b1acd4cff3f50b6f6bb77 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 22:25:58 +0000 Subject: [PATCH 18/21] Debugging --- src/lightning_app/frontend/streamlit_base.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/lightning_app/frontend/streamlit_base.py b/src/lightning_app/frontend/streamlit_base.py index 2cae7190cbdb0..f23087b69e0ef 100644 --- a/src/lightning_app/frontend/streamlit_base.py +++ b/src/lightning_app/frontend/streamlit_base.py @@ -14,7 +14,10 @@ def _get_render_fn_from_environment() -> Callable: render_fn_name = os.environ["LIGHTNING_RENDER_FUNCTION"] render_fn_module_file = os.environ["LIGHTNING_RENDER_MODULE_FILE"] - module = pydoc.importfile(render_fn_module_file) + try: + module = pydoc.importfile(render_fn_module_file) + except pydoc.ErrorDuringImport as exc: + raise exc.value from None return getattr(module, render_fn_name) From 193535b43ee1668b912d047ca896c231cae58b87 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 22:39:55 +0000 Subject: [PATCH 19/21] Try something --- src/lightning_app/components/auto_scaler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lightning_app/components/auto_scaler.py b/src/lightning_app/components/auto_scaler.py index ad9d69690b23d..4d41bf370bef3 100644 --- a/src/lightning_app/components/auto_scaler.py +++ b/src/lightning_app/components/auto_scaler.py @@ -25,7 +25,6 @@ from lightning_app.utilities.packaging.cloud_compute import CloudCompute logger = Logger(__name__) -lock = asyncio.Lock() def _raise_granular_exception(exception: Exception) -> None: @@ -209,6 +208,7 @@ async def process_request(self, data: BaseModel): def run(self): logger.info(f"servers: {self.servers}") + lock = asyncio.Lock() self._iter = cycle(self.servers) self._last_batch_sent = time.time() From cfd8daeb460370526d13bb25d20bbcd6a95aca3c Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 22:52:26 +0000 Subject: [PATCH 20/21] Cleanup --- src/lightning_app/frontend/streamlit_base.py | 5 +-- src/lightning_app/testing/testing.py | 39 +++++++++---------- .../public/test_template_streamlit_ui.py | 31 ++++----------- 3 files changed, 27 insertions(+), 48 deletions(-) diff --git a/src/lightning_app/frontend/streamlit_base.py b/src/lightning_app/frontend/streamlit_base.py index f23087b69e0ef..2cae7190cbdb0 100644 --- a/src/lightning_app/frontend/streamlit_base.py +++ b/src/lightning_app/frontend/streamlit_base.py @@ -14,10 +14,7 @@ def _get_render_fn_from_environment() -> Callable: render_fn_name = os.environ["LIGHTNING_RENDER_FUNCTION"] render_fn_module_file = os.environ["LIGHTNING_RENDER_MODULE_FILE"] - try: - module = pydoc.importfile(render_fn_module_file) - except pydoc.ErrorDuringImport as exc: - raise exc.value from None + module = pydoc.importfile(render_fn_module_file) return getattr(module, render_fn_name) diff --git a/src/lightning_app/testing/testing.py b/src/lightning_app/testing/testing.py index aa8668e3cbf65..989539964dc62 100644 --- a/src/lightning_app/testing/testing.py +++ b/src/lightning_app/testing/testing.py @@ -385,25 +385,9 @@ def run_app_in_cloud( process = Process(target=_print_logs, kwargs={"app_id": app_id}) process.start() - # Wait until the app is running - while True: - sleep(1) - - lit_apps = [ - app - for app in client.lightningapp_instance_service_list_lightningapp_instances( - project_id=project.project_id - ).lightningapps - if app.name == name - ] - app = lit_apps[0] - - if app.status.phase == V1LightningappInstanceState.RUNNING: - break - view_page = None if not app.spec.is_headless: - for _ in range(5): + while True: try: admin_page.reload() with admin_page.context.expect_page() as page_catcher: @@ -412,10 +396,23 @@ def run_app_in_cloud( view_page.wait_for_load_state(timeout=0) break except (playwright._impl._api_types.Error, playwright._impl._api_types.TimeoutError): - sleep(1) - - if view_page is None: - raise RuntimeError("Failed to open the app UI") + pass + else: + # Wait until the app is running + while True: + sleep(1) + + lit_apps = [ + app + for app in client.lightningapp_instance_service_list_lightningapp_instances( + project_id=project.project_id + ).lightningapps + if app.name == name + ] + app = lit_apps[0] + + if app.status.phase == V1LightningappInstanceState.RUNNING: + break # TODO: is re-creating this redundant? lit_apps = [ diff --git a/tests/tests_examples_app/public/test_template_streamlit_ui.py b/tests/tests_examples_app/public/test_template_streamlit_ui.py index 0eab28366ca2a..c78022d4c878c 100644 --- a/tests/tests_examples_app/public/test_template_streamlit_ui.py +++ b/tests/tests_examples_app/public/test_template_streamlit_ui.py @@ -4,7 +4,7 @@ import pytest from tests_examples_app.public import _PATH_EXAMPLES -from lightning_app.testing.testing import run_app_in_cloud +from lightning_app.testing.testing import run_app_in_cloud, wait_for @pytest.mark.cloud @@ -16,34 +16,19 @@ def test_template_streamlit_ui_example_cloud() -> None: fetch_logs, _, ): - import playwright - print("Reached") + def click_button(*_, **__): + button = view_page.frame_locator("iframe").locator('button:has-text("Should print to the terminal ?")') - i = 0 - - while i < 2: - try: - print("clicking") - button = view_page.frame_locator("iframe").locator('button:has-text("Should print to the terminal ?")') + if button.all_text_contents() == ["Should print to the terminal ?"]: button.click() - print("clicked") - break - except (playwright._impl._api_types.Error, playwright._impl._api_types.TimeoutError) as e: - print(e) - try: - sleep(5) - view_page.reload() - except (playwright._impl._api_types.Error, playwright._impl._api_types.TimeoutError) as e: - print(e) - pass - sleep(2) - i = i + 1 + return True + + wait_for(view_page, click_button) has_logs = False while not has_logs: for log in fetch_logs(): - print(log) - if "Hello World!" in log: + if "0: Hello World!" in log: has_logs = True sleep(1) From 60653e3df452d97abd2d056217bfaf8908afa48c Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 22:54:14 +0000 Subject: [PATCH 21/21] Cleanup --- src/lightning_app/testing/testing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lightning_app/testing/testing.py b/src/lightning_app/testing/testing.py index 989539964dc62..8d112d7fa4a7a 100644 --- a/src/lightning_app/testing/testing.py +++ b/src/lightning_app/testing/testing.py @@ -385,11 +385,9 @@ def run_app_in_cloud( process = Process(target=_print_logs, kwargs={"app_id": app_id}) process.start() - view_page = None if not app.spec.is_headless: while True: try: - admin_page.reload() with admin_page.context.expect_page() as page_catcher: admin_page.locator('[data-cy="open"]').click() view_page = page_catcher.value @@ -398,6 +396,8 @@ def run_app_in_cloud( except (playwright._impl._api_types.Error, playwright._impl._api_types.TimeoutError): pass else: + view_page = None + # Wait until the app is running while True: sleep(1)