From 36a67d81230ef37d992060b9dd87309df4d8e233 Mon Sep 17 00:00:00 2001 From: thomas chaton Date: Mon, 18 Dec 2023 17:31:34 +0000 Subject: [PATCH 01/11] update --- src/lightning/app/core/app.py | 17 ++++++++---- src/lightning/app/core/queues.py | 47 +++++++++++++++++++++++++++++--- 2 files changed, 54 insertions(+), 10 deletions(-) diff --git a/src/lightning/app/core/app.py b/src/lightning/app/core/app.py index ffedad588e3c8..c3b0ccf9f89f1 100644 --- a/src/lightning/app/core/app.py +++ b/src/lightning/app/core/app.py @@ -308,6 +308,14 @@ def get_state_changed_from_queue(q: BaseQueue, timeout: Optional[float] = None) except queue.Empty: return None + @staticmethod + def batch_get_state_changed_from_queue(q: BaseQueue, timeout: Optional[float] = None) -> List[dict]: + try: + timeout = timeout or q.default_timeout + return q.get_all(timeout=timeout) + except queue.Empty: + return [] + def check_error_queue(self) -> None: exception: Exception = self.get_state_changed_from_queue(self.error_queue) # type: ignore[assignment,arg-type] if isinstance(exception, Exception): @@ -341,12 +349,11 @@ def _collect_deltas_from_ui_and_work_queues(self) -> List[Union[Delta, _APIReque while (time() - t0) < self.state_accumulate_wait: # TODO: Fetch all available deltas at once to reduce queue calls. - delta: Optional[ - Union[_DeltaRequest, _APIRequest, _CommandRequest, ComponentDelta] - ] = self.get_state_changed_from_queue( + received_deltas = self.batch_get_state_changed_from_queue( self.delta_queue # type: ignore[assignment,arg-type] ) - if delta: + for delta in received_deltas: + print(delta) if isinstance(delta, _DeltaRequest): deltas.append(delta.delta) elif isinstance(delta, ComponentDelta): @@ -364,8 +371,6 @@ def _collect_deltas_from_ui_and_work_queues(self) -> List[Union[Delta, _APIReque deltas.append(delta) else: api_or_command_request_deltas.append(delta) - else: - break if api_or_command_request_deltas: _process_requests(self, api_or_command_request_deltas) diff --git a/src/lightning/app/core/queues.py b/src/lightning/app/core/queues.py index da941ae72503e..5366ecd4f37d1 100644 --- a/src/lightning/app/core/queues.py +++ b/src/lightning/app/core/queues.py @@ -22,10 +22,11 @@ from pathlib import Path from typing import Any, Optional, Tuple from urllib.parse import urljoin - +import numpy as np import backoff import requests from requests.exceptions import ConnectionError, ConnectTimeout, ReadTimeout +import base64 from lightning.app.core.constants import ( HTTP_QUEUE_REFRESH_INTERVAL, @@ -189,6 +190,19 @@ def get(self, timeout: Optional[float] = None) -> Any: """ pass + @abstractmethod + def get_all(self, timeout: Optional[float] = None) -> Any: + """Returns the left most elements of the queue. + + Parameters + ---------- + timeout: + Read timeout in seconds, in case of input timeout is 0, the `self.default_timeout` is used. + A timeout of None can be used to block indefinitely. + + """ + pass + @property def is_running(self) -> bool: """Returns True if the queue is running, False otherwise. @@ -214,6 +228,11 @@ def get(self, timeout: Optional[float] = None) -> Any: timeout = self.default_timeout return self.queue.get(timeout=timeout, block=(timeout is None)) + def get_all(self, timeout: Optional[float] = None) -> Any: + if timeout == 0: + timeout = self.default_timeout + return [self.queue.get(timeout=timeout, block=(timeout is None))] + class RedisQueue(BaseQueue): @requires("redis") @@ -312,6 +331,9 @@ def get(self, timeout: Optional[float] = None) -> Any: raise queue.Empty return pickle.loads(out[1]) + def get_all(self, timeout: Optional[float] = None) -> Any: + raise NotImplementedError + def clear(self) -> None: """Clear all elements in the queue.""" self.redis.delete(self.name) @@ -366,7 +388,6 @@ def __init__(self, queue: BaseQueue, requests_per_second: float): self._seconds_per_request = 1 / requests_per_second self._last_get = 0.0 - self._last_put = 0.0 @property def is_running(self) -> bool: @@ -383,9 +404,12 @@ def get(self, timeout: Optional[float] = None) -> Any: self._last_get = time.time() return self._queue.get(timeout=timeout) + def get_all(self, timeout: Optional[float] = None) -> Any: + self._wait_until_allowed(self._last_get) + self._last_get = time.time() + return self._queue.get_all(timeout=timeout) + def put(self, item: Any) -> None: - self._wait_until_allowed(self._last_put) - self._last_put = time.time() return self._queue.put(item) @@ -477,6 +501,21 @@ def _get(self) -> Any: # we consider the queue is empty to avoid failing the app. raise queue.Empty + def get_all(self, timeout: Optional[float] = None) -> Any: + if not self.app_id: + raise ValueError(f"App ID couldn't be extracted from the queue name: {self.name}") + + try: + print("HERE") + resp = self.client.post(f"v1/{self.app_id}/{self._name_suffix}", query_params={"action": "popCount", "count": "64"}) + if resp.status_code == 204: + raise queue.Empty + return [pickle.loads(base64.b64decode(data)) for data in resp.json()] + except ConnectionError: + # Note: If the Http Queue service isn't available, + # we consider the queue is empty to avoid failing the app. + raise queue.Empty + @backoff.on_exception(backoff.expo, (RuntimeError, requests.exceptions.HTTPError)) def put(self, item: Any) -> None: if not self.app_id: From 48abed66a4877af66e0b4f219b07c2e7c805beb3 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 18 Dec 2023 17:36:03 +0000 Subject: [PATCH 02/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/lightning/app/core/queues.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/lightning/app/core/queues.py b/src/lightning/app/core/queues.py index 5366ecd4f37d1..e699be04a0ef6 100644 --- a/src/lightning/app/core/queues.py +++ b/src/lightning/app/core/queues.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import multiprocessing import pickle import queue # needed as import instead from/import for mocking in tests @@ -22,11 +23,10 @@ from pathlib import Path from typing import Any, Optional, Tuple from urllib.parse import urljoin -import numpy as np + import backoff import requests from requests.exceptions import ConnectionError, ConnectTimeout, ReadTimeout -import base64 from lightning.app.core.constants import ( HTTP_QUEUE_REFRESH_INTERVAL, @@ -507,7 +507,9 @@ def get_all(self, timeout: Optional[float] = None) -> Any: try: print("HERE") - resp = self.client.post(f"v1/{self.app_id}/{self._name_suffix}", query_params={"action": "popCount", "count": "64"}) + resp = self.client.post( + f"v1/{self.app_id}/{self._name_suffix}", query_params={"action": "popCount", "count": "64"} + ) if resp.status_code == 204: raise queue.Empty return [pickle.loads(base64.b64decode(data)) for data in resp.json()] From 77c929f975b67b3192d4d894d267e88800dda538 Mon Sep 17 00:00:00 2001 From: thomas Date: Mon, 18 Dec 2023 17:42:34 +0000 Subject: [PATCH 03/11] update --- src/lightning/app/core/app.py | 4 +- src/lightning/app/core/constants.py | 2 + src/lightning/app/core/queues.py | 63 ++++++++++++++++++++++++----- 3 files changed, 56 insertions(+), 13 deletions(-) diff --git a/src/lightning/app/core/app.py b/src/lightning/app/core/app.py index c3b0ccf9f89f1..f4bd16979ba0a 100644 --- a/src/lightning/app/core/app.py +++ b/src/lightning/app/core/app.py @@ -29,6 +29,7 @@ from lightning.app import _console from lightning.app.api.request_types import _APIRequest, _CommandRequest, _DeltaRequest from lightning.app.core.constants import ( + BATCH_DELTA_COUNT, DEBUG_ENABLED, FLOW_DURATION_SAMPLES, FLOW_DURATION_THRESHOLD, @@ -312,7 +313,7 @@ def get_state_changed_from_queue(q: BaseQueue, timeout: Optional[float] = None) def batch_get_state_changed_from_queue(q: BaseQueue, timeout: Optional[float] = None) -> List[dict]: try: timeout = timeout or q.default_timeout - return q.get_all(timeout=timeout) + return q.batch_get(timeout=timeout, count=BATCH_DELTA_COUNT) except queue.Empty: return [] @@ -353,7 +354,6 @@ def _collect_deltas_from_ui_and_work_queues(self) -> List[Union[Delta, _APIReque self.delta_queue # type: ignore[assignment,arg-type] ) for delta in received_deltas: - print(delta) if isinstance(delta, _DeltaRequest): deltas.append(delta.delta) elif isinstance(delta, ComponentDelta): diff --git a/src/lightning/app/core/constants.py b/src/lightning/app/core/constants.py index cc23ebd645c24..566fc87bc9438 100644 --- a/src/lightning/app/core/constants.py +++ b/src/lightning/app/core/constants.py @@ -98,6 +98,8 @@ def get_lightning_cloud_url() -> str: # directory where system customization sync files will be copied to be packed into app tarball SYS_CUSTOMIZATIONS_SYNC_PATH = ".sys-customizations-sync" +BATCH_DELTA_COUNT = int(os.getenv("BATCH_DELTA_COUNT", "128")) + def enable_multiple_works_in_default_container() -> bool: return bool(int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0"))) diff --git a/src/lightning/app/core/queues.py b/src/lightning/app/core/queues.py index 5366ecd4f37d1..97df736452852 100644 --- a/src/lightning/app/core/queues.py +++ b/src/lightning/app/core/queues.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import multiprocessing import pickle import queue # needed as import instead from/import for mocking in tests @@ -22,11 +23,10 @@ from pathlib import Path from typing import Any, Optional, Tuple from urllib.parse import urljoin -import numpy as np + import backoff import requests from requests.exceptions import ConnectionError, ConnectTimeout, ReadTimeout -import base64 from lightning.app.core.constants import ( HTTP_QUEUE_REFRESH_INTERVAL, @@ -191,7 +191,7 @@ def get(self, timeout: Optional[float] = None) -> Any: pass @abstractmethod - def get_all(self, timeout: Optional[float] = None) -> Any: + def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None) -> Any: """Returns the left most elements of the queue. Parameters @@ -228,9 +228,10 @@ def get(self, timeout: Optional[float] = None) -> Any: timeout = self.default_timeout return self.queue.get(timeout=timeout, block=(timeout is None)) - def get_all(self, timeout: Optional[float] = None) -> Any: + def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None) -> Any: if timeout == 0: timeout = self.default_timeout + # For multiprocessing, we can simply collect the latest upmost element return [self.queue.get(timeout=timeout, block=(timeout is None))] @@ -331,8 +332,8 @@ def get(self, timeout: Optional[float] = None) -> Any: raise queue.Empty return pickle.loads(out[1]) - def get_all(self, timeout: Optional[float] = None) -> Any: - raise NotImplementedError + def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None) -> Any: + raise NotImplementedError("The batch_get method isn't implemented.") def clear(self) -> None: """Clear all elements in the queue.""" @@ -404,10 +405,10 @@ def get(self, timeout: Optional[float] = None) -> Any: self._last_get = time.time() return self._queue.get(timeout=timeout) - def get_all(self, timeout: Optional[float] = None) -> Any: + def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None) -> Any: self._wait_until_allowed(self._last_get) self._last_get = time.time() - return self._queue.get_all(timeout=timeout) + return self._queue.batch_get(timeout=timeout) def put(self, item: Any) -> None: return self._queue.put(item) @@ -501,13 +502,53 @@ def _get(self) -> Any: # we consider the queue is empty to avoid failing the app. raise queue.Empty - def get_all(self, timeout: Optional[float] = None) -> Any: + def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None) -> list[Any]: if not self.app_id: raise ValueError(f"App ID couldn't be extracted from the queue name: {self.name}") + # it's a blocking call, we need to loop and call the backend to mimic this behavior + if timeout is None: + while True: + try: + try: + return self._batch_get(count=count) + except requests.exceptions.HTTPError: + pass + except queue.Empty: + time.sleep(HTTP_QUEUE_REFRESH_INTERVAL) + + # make one request and return the result + if timeout == 0: + try: + return self._batch_get(count=count) + except requests.exceptions.HTTPError: + return [] + + # timeout is some value - loop until the timeout is reached + start_time = time.time() + while (time.time() - start_time) < timeout: + try: + try: + return self._batch_get(count=count) + except requests.exceptions.HTTPError: + if timeout > self.default_timeout: + return [] + raise queue.Empty + except queue.Empty: + # Note: In theory, there isn't a need for a sleep as the queue shouldn't + # block the flow if the queue is empty. + # However, as the Http Server can saturate, + # let's add a sleep here if a higher timeout is provided + # than the default timeout + if timeout > self.default_timeout: + time.sleep(0.05) + return [] + + def _batch_get(self, count: Optional[int] = 64) -> list[Any]: try: - print("HERE") - resp = self.client.post(f"v1/{self.app_id}/{self._name_suffix}", query_params={"action": "popCount", "count": "64"}) + resp = self.client.post( + f"v1/{self.app_id}/{self._name_suffix}", query_params={"action": "popCount", "count": str(count)} + ) if resp.status_code == 204: raise queue.Empty return [pickle.loads(base64.b64decode(data)) for data in resp.json()] From 97b44e85144ad77732b0bcf10184290e479853dc Mon Sep 17 00:00:00 2001 From: thomas chaton Date: Mon, 18 Dec 2023 17:55:39 +0000 Subject: [PATCH 04/11] update --- src/lightning/app/core/app.py | 1 + src/lightning/app/core/queues.py | 45 ++------------------------------ 2 files changed, 3 insertions(+), 43 deletions(-) diff --git a/src/lightning/app/core/app.py b/src/lightning/app/core/app.py index c7c8ef2244d7c..5a164ab4b527b 100644 --- a/src/lightning/app/core/app.py +++ b/src/lightning/app/core/app.py @@ -355,6 +355,7 @@ def _collect_deltas_from_ui_and_work_queues(self) -> List[Union[Delta, _APIReque ] = self.batch_get_state_changed_from_queue( self.delta_queue # type: ignore[assignment,arg-type] ) + print(received_deltas) for delta in received_deltas: if isinstance(delta, _DeltaRequest): deltas.append(delta.delta) diff --git a/src/lightning/app/core/queues.py b/src/lightning/app/core/queues.py index 97df736452852..592de835dc9f6 100644 --- a/src/lightning/app/core/queues.py +++ b/src/lightning/app/core/queues.py @@ -41,6 +41,7 @@ REDIS_QUEUES_READ_DEFAULT_TIMEOUT, STATE_UPDATE_TIMEOUT, WARNING_QUEUE_SIZE, + BATCH_DELTA_COUNT, ) from lightning.app.utilities.app_helpers import Logger from lightning.app.utilities.imports import _is_redis_available, requires @@ -503,51 +504,9 @@ def _get(self) -> Any: raise queue.Empty def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None) -> list[Any]: - if not self.app_id: - raise ValueError(f"App ID couldn't be extracted from the queue name: {self.name}") - - # it's a blocking call, we need to loop and call the backend to mimic this behavior - if timeout is None: - while True: - try: - try: - return self._batch_get(count=count) - except requests.exceptions.HTTPError: - pass - except queue.Empty: - time.sleep(HTTP_QUEUE_REFRESH_INTERVAL) - - # make one request and return the result - if timeout == 0: - try: - return self._batch_get(count=count) - except requests.exceptions.HTTPError: - return [] - - # timeout is some value - loop until the timeout is reached - start_time = time.time() - while (time.time() - start_time) < timeout: - try: - try: - return self._batch_get(count=count) - except requests.exceptions.HTTPError: - if timeout > self.default_timeout: - return [] - raise queue.Empty - except queue.Empty: - # Note: In theory, there isn't a need for a sleep as the queue shouldn't - # block the flow if the queue is empty. - # However, as the Http Server can saturate, - # let's add a sleep here if a higher timeout is provided - # than the default timeout - if timeout > self.default_timeout: - time.sleep(0.05) - return [] - - def _batch_get(self, count: Optional[int] = 64) -> list[Any]: try: resp = self.client.post( - f"v1/{self.app_id}/{self._name_suffix}", query_params={"action": "popCount", "count": str(count)} + f"v1/{self.app_id}/{self._name_suffix}", query_params={"action": "popCount", "count": str(count or BATCH_DELTA_COUNT)} ) if resp.status_code == 204: raise queue.Empty From 4c16e70a098314eff739785a6611a7351276bc7e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 18 Dec 2023 17:56:57 +0000 Subject: [PATCH 05/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/lightning/app/core/queues.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/lightning/app/core/queues.py b/src/lightning/app/core/queues.py index 592de835dc9f6..30e0abef03c77 100644 --- a/src/lightning/app/core/queues.py +++ b/src/lightning/app/core/queues.py @@ -29,6 +29,7 @@ from requests.exceptions import ConnectionError, ConnectTimeout, ReadTimeout from lightning.app.core.constants import ( + BATCH_DELTA_COUNT, HTTP_QUEUE_REFRESH_INTERVAL, HTTP_QUEUE_REQUESTS_PER_SECOND, HTTP_QUEUE_TOKEN, @@ -41,7 +42,6 @@ REDIS_QUEUES_READ_DEFAULT_TIMEOUT, STATE_UPDATE_TIMEOUT, WARNING_QUEUE_SIZE, - BATCH_DELTA_COUNT, ) from lightning.app.utilities.app_helpers import Logger from lightning.app.utilities.imports import _is_redis_available, requires @@ -506,7 +506,8 @@ def _get(self) -> Any: def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None) -> list[Any]: try: resp = self.client.post( - f"v1/{self.app_id}/{self._name_suffix}", query_params={"action": "popCount", "count": str(count or BATCH_DELTA_COUNT)} + f"v1/{self.app_id}/{self._name_suffix}", + query_params={"action": "popCount", "count": str(count or BATCH_DELTA_COUNT)}, ) if resp.status_code == 204: raise queue.Empty From 3cd82cd1384a4d18c010cb98fc68a6c3232d3f42 Mon Sep 17 00:00:00 2001 From: thomas Date: Mon, 18 Dec 2023 18:09:28 +0000 Subject: [PATCH 06/11] update --- tests/tests_app/core/test_queues.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/tests_app/core/test_queues.py b/tests/tests_app/core/test_queues.py index 583e828b12430..0f68d8aa1ff98 100644 --- a/tests/tests_app/core/test_queues.py +++ b/tests/tests_app/core/test_queues.py @@ -1,3 +1,4 @@ +import base64 import multiprocessing import pickle import queue @@ -220,6 +221,24 @@ def test_http_queue_get(self, monkeypatch): ) assert test_queue.get() == "test" + def test_http_queue_batch_get(self, monkeypatch): + monkeypatch.setattr(queues, "HTTP_QUEUE_TOKEN", "test-token") + test_queue = HTTPQueue("test_http_queue", STATE_UPDATE_TIMEOUT) + adapter = requests_mock.Adapter() + test_queue.client.session.mount("http://", adapter) + + adapter.register_uri( + "POST", + f"{HTTP_QUEUE_URL}/v1/test/http_queue?action=popCount", + request_headers={"Authorization": "Bearer test-token"}, + status_code=200, + json=[ + base64.b64encode(pickle.dumps("test")).decode("utf-8"), + base64.b64encode(pickle.dumps("test2")).decode("utf-8"), + ], + ) + assert test_queue.batch_get() == ["test", "test2"] + def test_unreachable_queue(monkeypatch): monkeypatch.setattr(queues, "HTTP_QUEUE_TOKEN", "test-token") From cfd17b9a33221b80676874ddff98cba8a3dcd8cb Mon Sep 17 00:00:00 2001 From: thomas Date: Mon, 18 Dec 2023 18:10:31 +0000 Subject: [PATCH 07/11] update --- src/lightning/app/core/app.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lightning/app/core/app.py b/src/lightning/app/core/app.py index 5a164ab4b527b..c7c8ef2244d7c 100644 --- a/src/lightning/app/core/app.py +++ b/src/lightning/app/core/app.py @@ -355,7 +355,6 @@ def _collect_deltas_from_ui_and_work_queues(self) -> List[Union[Delta, _APIReque ] = self.batch_get_state_changed_from_queue( self.delta_queue # type: ignore[assignment,arg-type] ) - print(received_deltas) for delta in received_deltas: if isinstance(delta, _DeltaRequest): deltas.append(delta.delta) From 7d09933cd9ee4f2003aa335cb6cda5ce5b4585dd Mon Sep 17 00:00:00 2001 From: thomas Date: Mon, 18 Dec 2023 18:15:29 +0000 Subject: [PATCH 08/11] update --- src/lightning/app/core/queues.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/lightning/app/core/queues.py b/src/lightning/app/core/queues.py index 30e0abef03c77..f78d6cf8733d3 100644 --- a/src/lightning/app/core/queues.py +++ b/src/lightning/app/core/queues.py @@ -21,7 +21,7 @@ from abc import ABC, abstractmethod from enum import Enum from pathlib import Path -from typing import Any, Optional, Tuple +from typing import Any, List, Optional, Tuple from urllib.parse import urljoin import backoff @@ -192,7 +192,7 @@ def get(self, timeout: Optional[float] = None) -> Any: pass @abstractmethod - def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None) -> Any: + def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None) -> List[Any]: """Returns the left most elements of the queue. Parameters @@ -200,9 +200,10 @@ def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None timeout: Read timeout in seconds, in case of input timeout is 0, the `self.default_timeout` is used. A timeout of None can be used to block indefinitely. + count: + The number of element to get from the queue """ - pass @property def is_running(self) -> bool: @@ -229,7 +230,7 @@ def get(self, timeout: Optional[float] = None) -> Any: timeout = self.default_timeout return self.queue.get(timeout=timeout, block=(timeout is None)) - def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None) -> Any: + def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None) -> List[Any]: if timeout == 0: timeout = self.default_timeout # For multiprocessing, we can simply collect the latest upmost element @@ -503,7 +504,7 @@ def _get(self) -> Any: # we consider the queue is empty to avoid failing the app. raise queue.Empty - def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None) -> list[Any]: + def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None) -> List[Any]: try: resp = self.client.post( f"v1/{self.app_id}/{self._name_suffix}", From 067db6915f3866e5d8cc30d084086bade9da026b Mon Sep 17 00:00:00 2001 From: thomas Date: Mon, 18 Dec 2023 18:16:22 +0000 Subject: [PATCH 09/11] update --- .../app/utilities/packaging/lightning_utils.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/lightning/app/utilities/packaging/lightning_utils.py b/src/lightning/app/utilities/packaging/lightning_utils.py index 9e5493f332e3e..e8846b382e49e 100644 --- a/src/lightning/app/utilities/packaging/lightning_utils.py +++ b/src/lightning/app/utilities/packaging/lightning_utils.py @@ -150,6 +150,16 @@ def _prepare_lightning_wheels_and_requirements(root: Path, package_name: str = " tar_name = _copy_tar(lightning_cloud_project_path, root) tar_files.append(os.path.join(root, tar_name)) + lightning_launcher_project_path = get_dist_path_if_editable_install("lightning_launcher") + if lightning_launcher_project_path: + from lightning_launcher.__version__ import __version__ as cloud_version + + # todo: check why logging.info is missing in outputs + print(f"Packaged Lightning Launcher with your application. Version: {cloud_version}") + _prepare_wheel(lightning_launcher_project_path) + tar_name = _copy_tar(lightning_launcher_project_path, root) + tar_files.append(os.path.join(root, tar_name)) + return functools.partial(_cleanup, *tar_files) From b0858bacc09534e2d547465fc6a56d6309b853f4 Mon Sep 17 00:00:00 2001 From: thomas Date: Mon, 18 Dec 2023 18:47:31 +0000 Subject: [PATCH 10/11] update --- src/lightning/app/testing/helpers.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/lightning/app/testing/helpers.py b/src/lightning/app/testing/helpers.py index 7f87180c959f3..61a00f957299e 100644 --- a/src/lightning/app/testing/helpers.py +++ b/src/lightning/app/testing/helpers.py @@ -142,6 +142,11 @@ def get(self, timeout: int = 0): raise Empty() return self._queue.pop(0) + def batch_get(self, timeout: int = 0, count: int = None): + if not self._queue: + raise Empty() + return [self._queue.pop(0)] + def __contains__(self, item): return item in self._queue From c3a59e15401ab209500e92306b169a753686633d Mon Sep 17 00:00:00 2001 From: thomas Date: Mon, 18 Dec 2023 19:12:10 +0000 Subject: [PATCH 11/11] update --- src/lightning/app/core/app.py | 3 +++ src/lightning/app/core/queues.py | 2 +- tests/tests_app/core/test_lightning_app.py | 13 +++++++------ 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/lightning/app/core/app.py b/src/lightning/app/core/app.py index c7c8ef2244d7c..ed4c1d5c76f3f 100644 --- a/src/lightning/app/core/app.py +++ b/src/lightning/app/core/app.py @@ -355,6 +355,9 @@ def _collect_deltas_from_ui_and_work_queues(self) -> List[Union[Delta, _APIReque ] = self.batch_get_state_changed_from_queue( self.delta_queue # type: ignore[assignment,arg-type] ) + if len(received_deltas) == []: + break + for delta in received_deltas: if isinstance(delta, _DeltaRequest): deltas.append(delta.delta) diff --git a/src/lightning/app/core/queues.py b/src/lightning/app/core/queues.py index f78d6cf8733d3..d37251c824616 100644 --- a/src/lightning/app/core/queues.py +++ b/src/lightning/app/core/queues.py @@ -335,7 +335,7 @@ def get(self, timeout: Optional[float] = None) -> Any: return pickle.loads(out[1]) def batch_get(self, timeout: Optional[float] = None, count: Optional[int] = None) -> Any: - raise NotImplementedError("The batch_get method isn't implemented.") + return [self.get(timeout=timeout)] def clear(self) -> None: """Clear all elements in the queue.""" diff --git a/tests/tests_app/core/test_lightning_app.py b/tests/tests_app/core/test_lightning_app.py index 86f71d6f09154..d529b373e4df0 100644 --- a/tests/tests_app/core/test_lightning_app.py +++ b/tests/tests_app/core/test_lightning_app.py @@ -446,8 +446,8 @@ def run(self): @pytest.mark.parametrize( ("sleep_time", "expect"), [ - (1, 0), - pytest.param(0, 10.0, marks=pytest.mark.xfail(strict=False, reason="failing...")), # fixme + (0, 9), + pytest.param(9, 10.0, marks=pytest.mark.xfail(strict=False, reason="failing...")), # fixme ], ) @pytest.mark.flaky(reruns=5) @@ -456,10 +456,10 @@ def test_lightning_app_aggregation_speed(default_timeout, queue_type_cls: BaseQu time window.""" class SlowQueue(queue_type_cls): - def get(self, timeout): + def batch_get(self, timeout, count): out = super().get(timeout) sleep(sleep_time) - return out + return [out] app = LightningApp(EmptyFlow()) @@ -480,7 +480,7 @@ def make_delta(i): delta = app._collect_deltas_from_ui_and_work_queues()[-1] generated = delta.to_dict()["values_changed"]["root['vars']['counter']"]["new_value"] if sleep_time: - assert generated == expect + assert generated == expect, (generated, expect) else: # validate the flow should have aggregated at least expect. assert generated > expect @@ -497,7 +497,8 @@ def get(self, timeout): app.delta_queue = SlowQueue("api_delta_queue", 0) t0 = time() assert app._collect_deltas_from_ui_and_work_queues() == [] - assert (time() - t0) < app.state_accumulate_wait + delta = time() - t0 + assert delta < app.state_accumulate_wait + 0.01, delta class SimpleFlow2(LightningFlow):