From bdeb1f54bde2f08f317df0fe3b20c6bf5db283c7 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Fri, 16 Dec 2022 17:19:00 +0000 Subject: [PATCH 01/11] Update src/lightning_app/components/serve/__init__.py --- src/lightning_app/components/serve/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lightning_app/components/serve/__init__.py b/src/lightning_app/components/serve/__init__.py index 6a125ac3fda04..dbeb7f8372766 100644 --- a/src/lightning_app/components/serve/__init__.py +++ b/src/lightning_app/components/serve/__init__.py @@ -1,4 +1,5 @@ from lightning_app.components.serve.python_server import Category, Image, Number, Text, PythonServer, AutoScaler +from lightning_app.components.serve.gradio import ServeGradio from lightning_app.components.serve.streamlit import ServeStreamlit __all__ = ["ServeGradio", "ServeStreamlit", "PythonServer", "Image", "Number", "Category", "Text", "AutoScaler"] From 3d3b5929eca0fbdf360fa5d9a6923b579690b160 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 16 Dec 2022 17:21:49 +0000 Subject: [PATCH 02/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/lightning_app/components/__init__.py | 2 +- src/lightning_app/components/serve/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lightning_app/components/__init__.py b/src/lightning_app/components/__init__.py index 9c94e7dfe009f..8182fa4f57826 100644 --- a/src/lightning_app/components/__init__.py +++ b/src/lightning_app/components/__init__.py @@ -9,7 +9,7 @@ from lightning_app.components.python.popen import PopenPythonScript from lightning_app.components.python.tracer import Code, TracerPythonScript from lightning_app.components.serve.gradio import ServeGradio -from lightning_app.components.serve.python_server import Category, Image, Number, Text, PythonServer, AutoScaler +from lightning_app.components.serve.python_server import AutoScaler, Category, Image, Number, PythonServer, Text from lightning_app.components.serve.serve import ModelInferenceAPI from lightning_app.components.serve.streamlit import ServeStreamlit from lightning_app.components.training import LightningTrainerScript, PyTorchLightningScriptRunner diff --git a/src/lightning_app/components/serve/__init__.py b/src/lightning_app/components/serve/__init__.py index dbeb7f8372766..7ac1d79b05659 100644 --- a/src/lightning_app/components/serve/__init__.py +++ b/src/lightning_app/components/serve/__init__.py @@ -1,5 +1,5 @@ -from lightning_app.components.serve.python_server import Category, Image, Number, Text, PythonServer, AutoScaler from lightning_app.components.serve.gradio import ServeGradio +from lightning_app.components.serve.python_server import AutoScaler, Category, Image, Number, PythonServer, Text from lightning_app.components.serve.streamlit import ServeStreamlit __all__ = ["ServeGradio", "ServeStreamlit", "PythonServer", "Image", "Number", "Category", "Text", "AutoScaler"] From c38768a9ba80304262bfa43a6bafcb7969743a0c Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Fri, 16 Dec 2022 17:51:31 +0000 Subject: [PATCH 03/11] test --- src/lightning_app/components/__init__.py | 3 ++- src/lightning_app/components/serve/__init__.py | 3 ++- .../components/serve/auto_scaler.py | 4 ++++ .../components/serve/test_auto_scaler.py | 18 +++++++++++++++++- 4 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/lightning_app/components/__init__.py b/src/lightning_app/components/__init__.py index 8182fa4f57826..801365c6acc63 100644 --- a/src/lightning_app/components/__init__.py +++ b/src/lightning_app/components/__init__.py @@ -9,7 +9,8 @@ from lightning_app.components.python.popen import PopenPythonScript from lightning_app.components.python.tracer import Code, TracerPythonScript from lightning_app.components.serve.gradio import ServeGradio -from lightning_app.components.serve.python_server import AutoScaler, Category, Image, Number, PythonServer, Text +from lightning_app.components.serve.python_server import Category, Image, Number, PythonServer, Text +from lightning_app.components.serve.auto_scaler import AutoScaler from lightning_app.components.serve.serve import ModelInferenceAPI from lightning_app.components.serve.streamlit import ServeStreamlit from lightning_app.components.training import LightningTrainerScript, PyTorchLightningScriptRunner diff --git a/src/lightning_app/components/serve/__init__.py b/src/lightning_app/components/serve/__init__.py index 7ac1d79b05659..34785f820c2eb 100644 --- a/src/lightning_app/components/serve/__init__.py +++ b/src/lightning_app/components/serve/__init__.py @@ -1,5 +1,6 @@ from lightning_app.components.serve.gradio import ServeGradio -from lightning_app.components.serve.python_server import AutoScaler, Category, Image, Number, PythonServer, Text +from lightning_app.components.serve.python_server import Category, Image, Number, PythonServer, Text +from lightning_app.components.serve.auto_scaler import AutoScaler from lightning_app.components.serve.streamlit import ServeStreamlit __all__ = ["ServeGradio", "ServeStreamlit", "PythonServer", "Image", "Number", "Category", "Text", "AutoScaler"] diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 1fce191d7603f..6027249de850f 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -346,6 +346,10 @@ def send_request_to_update_servers(self, servers: List[str]): @staticmethod def _get_sample_dict_from_datatype(datatype: Any) -> dict: + if not hasattr(datatype, "schema"): + # not a pydantic model + raise TypeError(f"datatype must be a pydantic model, for the UI to be generated. but got {datatype}") + if hasattr(datatype, "_get_sample_data"): return datatype._get_sample_data() diff --git a/tests/tests_app/components/serve/test_auto_scaler.py b/tests/tests_app/components/serve/test_auto_scaler.py index 672b05bbc9a15..2e6fa84d6ccc6 100644 --- a/tests/tests_app/components/serve/test_auto_scaler.py +++ b/tests/tests_app/components/serve/test_auto_scaler.py @@ -1,10 +1,11 @@ import time +from unittest import mock from unittest.mock import patch import pytest from lightning_app import CloudCompute, LightningWork -from lightning_app.components import AutoScaler +from lightning_app.components import AutoScaler, Text class EmptyWork(LightningWork): @@ -98,3 +99,18 @@ def test_create_work_cloud_compute_cloned(): auto_scaler = AutoScaler(EmptyWork, cloud_compute=cloud_compute) _ = auto_scaler.create_work() assert auto_scaler._work_kwargs["cloud_compute"] is not cloud_compute + + +fastapi_mock = mock.MagicMock() +mocked_fastapi_creater = mock.MagicMock(return_value=fastapi_mock) + + +@patch("lightning_app.components.serve.auto_scaler._create_fastapi", mocked_fastapi_creater) +@patch("lightning_app.components.serve.auto_scaler.uvicorn.run", mock.MagicMock()) +def test_API_ACCESS_ENDPOINT_creation(): + auto_scaler = AutoScaler(EmptyWork, input_type=Text, output_type=Text) + assert auto_scaler.load_balancer._work_name == "EmptyWork" + + auto_scaler.load_balancer.run() + fastapi_mock.mount.assert_called_once_with("/endpoint-info", mock.ANY, name="static") + From 98ba462dfa78c5ece64a6842d827327e86ad99eb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 16 Dec 2022 17:53:03 +0000 Subject: [PATCH 04/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/lightning_app/components/__init__.py | 2 +- src/lightning_app/components/serve/__init__.py | 2 +- tests/tests_app/components/serve/test_auto_scaler.py | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/lightning_app/components/__init__.py b/src/lightning_app/components/__init__.py index 801365c6acc63..5fd8af6b055de 100644 --- a/src/lightning_app/components/__init__.py +++ b/src/lightning_app/components/__init__.py @@ -8,9 +8,9 @@ ) from lightning_app.components.python.popen import PopenPythonScript from lightning_app.components.python.tracer import Code, TracerPythonScript +from lightning_app.components.serve.auto_scaler import AutoScaler from lightning_app.components.serve.gradio import ServeGradio from lightning_app.components.serve.python_server import Category, Image, Number, PythonServer, Text -from lightning_app.components.serve.auto_scaler import AutoScaler from lightning_app.components.serve.serve import ModelInferenceAPI from lightning_app.components.serve.streamlit import ServeStreamlit from lightning_app.components.training import LightningTrainerScript, PyTorchLightningScriptRunner diff --git a/src/lightning_app/components/serve/__init__.py b/src/lightning_app/components/serve/__init__.py index 34785f820c2eb..ac02e69c4f2ab 100644 --- a/src/lightning_app/components/serve/__init__.py +++ b/src/lightning_app/components/serve/__init__.py @@ -1,6 +1,6 @@ +from lightning_app.components.serve.auto_scaler import AutoScaler from lightning_app.components.serve.gradio import ServeGradio from lightning_app.components.serve.python_server import Category, Image, Number, PythonServer, Text -from lightning_app.components.serve.auto_scaler import AutoScaler from lightning_app.components.serve.streamlit import ServeStreamlit __all__ = ["ServeGradio", "ServeStreamlit", "PythonServer", "Image", "Number", "Category", "Text", "AutoScaler"] diff --git a/tests/tests_app/components/serve/test_auto_scaler.py b/tests/tests_app/components/serve/test_auto_scaler.py index 2e6fa84d6ccc6..d1adb1c5aacd2 100644 --- a/tests/tests_app/components/serve/test_auto_scaler.py +++ b/tests/tests_app/components/serve/test_auto_scaler.py @@ -113,4 +113,3 @@ def test_API_ACCESS_ENDPOINT_creation(): auto_scaler.load_balancer.run() fastapi_mock.mount.assert_called_once_with("/endpoint-info", mock.ANY, name="static") - From 1d1ce43d11561b0d27bf7f92f296b5b6a4de93e0 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 07:16:15 +0000 Subject: [PATCH 05/11] resetting batches properly --- src/lightning_app/components/serve/auto_scaler.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 6027249de850f..7d0114069f797 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -185,15 +185,13 @@ async def send_batch(self, batch: List[Tuple[str, _BatchRequestModel]]): async def consumer(self): while True: await asyncio.sleep(0.05) - - batch = self._batch[: self.max_batch_size] - while batch and ( - (len(batch) == self.max_batch_size) or ((time.time() - self._last_batch_sent) > self.timeout_batching) - ): + batch = self._batch[:self.max_batch_size] + is_batch_ready = len(batch) == self.max_batch_size + is_batch_timeout = time.time() - self._last_batch_sent > self.timeout_batching + if batch and (is_batch_ready or is_batch_timeout): asyncio.create_task(self.send_batch(batch)) - - self._batch = self._batch[self.max_batch_size :] - batch = self._batch[: self.max_batch_size] + # resetting the batch array, TODO - not locking the array + self._batch = self._batch[len(batch):] self._last_batch_sent = time.time() async def process_request(self, data: BaseModel): From adf33d13934144b209b65f8e81b401475c407768 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 07:20:22 +0000 Subject: [PATCH 06/11] fixes to imports --- pyproject.toml | 2 +- tests/tests_app/components/serve/test_auto_scaler.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8611ef9323deb..4461d956634c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,8 +79,8 @@ module = [ "lightning_app.components.serve.types.image", "lightning_app.components.serve.types.type", "lightning_app.components.serve.python_server", + "lightning_app.components.serve.auto_scaler", "lightning_app.components.training", - "lightning_app.components.auto_scaler", "lightning_app.core.api", "lightning_app.core.app", "lightning_app.core.flow", diff --git a/tests/tests_app/components/serve/test_auto_scaler.py b/tests/tests_app/components/serve/test_auto_scaler.py index d1adb1c5aacd2..6bd5aa958b6bf 100644 --- a/tests/tests_app/components/serve/test_auto_scaler.py +++ b/tests/tests_app/components/serve/test_auto_scaler.py @@ -33,8 +33,8 @@ def test_num_replicas_after_init(): @patch("uvicorn.run") -@patch("lightning_app.components.auto_scaler._LoadBalancer.url") -@patch("lightning_app.components.auto_scaler.AutoScaler.num_pending_requests") +@patch("lightning_app.components.serve.auto_scaler._LoadBalancer.url") +@patch("lightning_app.components.serve.auto_scaler.AutoScaler.num_pending_requests") def test_num_replicas_not_above_max_replicas(*_): """Test self.num_replicas doesn't exceed max_replicas.""" max_replicas = 6 @@ -53,8 +53,8 @@ def test_num_replicas_not_above_max_replicas(*_): @patch("uvicorn.run") -@patch("lightning_app.components.auto_scaler._LoadBalancer.url") -@patch("lightning_app.components.auto_scaler.AutoScaler.num_pending_requests") +@patch("lightning_app.components.serve.auto_scaler._LoadBalancer.url") +@patch("lightning_app.components.serve.auto_scaler.AutoScaler.num_pending_requests") def test_num_replicas_not_belo_min_replicas(*_): """Test self.num_replicas doesn't exceed max_replicas.""" min_replicas = 1 From af1569de585be00f2e372303e1bc4d8082b94bef Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 19 Dec 2022 12:58:38 +0000 Subject: [PATCH 07/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/lightning_app/components/serve/auto_scaler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 7d0114069f797..36ae720ae8943 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -185,13 +185,13 @@ async def send_batch(self, batch: List[Tuple[str, _BatchRequestModel]]): async def consumer(self): while True: await asyncio.sleep(0.05) - batch = self._batch[:self.max_batch_size] + batch = self._batch[: self.max_batch_size] is_batch_ready = len(batch) == self.max_batch_size is_batch_timeout = time.time() - self._last_batch_sent > self.timeout_batching if batch and (is_batch_ready or is_batch_timeout): asyncio.create_task(self.send_batch(batch)) # resetting the batch array, TODO - not locking the array - self._batch = self._batch[len(batch):] + self._batch = self._batch[len(batch) :] self._last_batch_sent = time.time() async def process_request(self, data: BaseModel): From ddbb3d7d2ed4fb26f550a72b7f624f761da7676b Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 15:27:39 +0000 Subject: [PATCH 08/11] setting the last sent time --- src/lightning_app/components/serve/auto_scaler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 36ae720ae8943..9d64100cc656f 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -183,6 +183,7 @@ async def send_batch(self, batch: List[Tuple[str, _BatchRequestModel]]): self._responses.update(result) async def consumer(self): + self._last_batch_sent = time.time() while True: await asyncio.sleep(0.05) batch = self._batch[: self.max_batch_size] From a558a0550a13d49da7fac37e29aba9608fae03cb Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 17:16:41 +0000 Subject: [PATCH 09/11] attempt --- .../components/serve/auto_scaler.py | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index f9269ea185f0a..f169f45cca6d8 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -146,14 +146,14 @@ def __init__( self._responses = {} # {request_id: response} self._last_batch_sent = 0 self._work_name = work_name + self._server_status = {} if not endpoint.startswith("/"): endpoint = "/" + endpoint self.endpoint = endpoint - async def send_batch(self, batch: List[Tuple[str, _BatchRequestModel]]): - server = next(self._iter) # round-robin + async def send_batch(self, batch: List[Tuple[str, _BatchRequestModel]], server_url: str): request_data: List[_LoadBalancer._input_type] = [b[1] for b in batch] batch_request_data = _BatchRequestModel(inputs=request_data) @@ -164,11 +164,17 @@ async def send_batch(self, batch: List[Tuple[str, _BatchRequestModel]]): "Content-Type": "application/json", } async with session.post( - f"{server}{self.endpoint}", + f"{server_url}{self.endpoint}", json=batch_request_data.dict(), timeout=self._timeout_inference_request, headers=headers, ) as response: + # resetting the server status so other requests can be + # scheduled on this node + if server_url in self._server_status: + # TODO - if the server returns an error, track that so + # we don't send more requests to it + self._server_status[server_url] = True if response.status == 408: raise HTTPException(408, "Request timed out") response.raise_for_status() @@ -182,6 +188,14 @@ async def send_batch(self, batch: List[Tuple[str, _BatchRequestModel]]): result = {request[0]: ex for request in batch} self._responses.update(result) + def _find_free_server(self) -> Optional[str]: + for server in self._server_status: + status = self._server_status.get(server, None) + if status is None: + logger.error("Server is not found in the status list. This should not happen.") + if status: + return server + async def consumer(self): self._last_batch_sent = time.time() while True: @@ -189,8 +203,19 @@ async def consumer(self): batch = self._batch[: self.max_batch_size] is_batch_ready = len(batch) == self.max_batch_size is_batch_timeout = time.time() - self._last_batch_sent > self.timeout_batching + server_url = self._find_free_server() + # setting the server status to be busy! This will be reset by + # the send_batch function after the server responds + self._server_status[server_url] = False + if server_url is None: + # TODO - a timeout until we try looking for servers + logger.error("No servers available") + continue if batch and (is_batch_ready or is_batch_timeout): - asyncio.create_task(self.send_batch(batch)) + # find server with capacity + # TODO multiple instances of consumer should not be running + # without locking the server array + asyncio.create_task(self.send_batch(batch, server_url)) # resetting the batch array, TODO - not locking the array self._batch = self._batch[len(batch) :] self._last_batch_sent = time.time() @@ -280,6 +305,14 @@ async def update_servers(servers: List[str], authenticated: bool = Depends(authe async with lock: self.servers = servers self._iter = cycle(self.servers) + updated_servers = set() + for server in servers: + updated_servers.add(server) + if server not in self._server_status: + self._server_status[server] = True + for existing in self._server_status: + if existing not in updated_servers: + del self._server_status[existing] @fastapi_app.post(self.endpoint, response_model=self._output_type) async def balance_api(inputs: input_type): From 19ce664ac869c4ccbf5063202f255a6e8c674a65 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 19:08:46 +0000 Subject: [PATCH 10/11] fixes after testing --- src/lightning_app/components/serve/auto_scaler.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index f169f45cca6d8..d0c0c40f9953a 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -158,6 +158,7 @@ async def send_batch(self, batch: List[Tuple[str, _BatchRequestModel]], server_u batch_request_data = _BatchRequestModel(inputs=request_data) try: + self._server_status[server_url] = False async with aiohttp.ClientSession() as session: headers = { "accept": "application/json", @@ -187,9 +188,12 @@ async def send_batch(self, batch: List[Tuple[str, _BatchRequestModel]], server_u except Exception as ex: result = {request[0]: ex for request in batch} self._responses.update(result) + finally: + self._server_status[server_url] = True def _find_free_server(self) -> Optional[str]: - for server in self._server_status: + existing = set(self._server_status.keys()) + for server in existing: status = self._server_status.get(server, None) if status is None: logger.error("Server is not found in the status list. This should not happen.") @@ -206,7 +210,6 @@ async def consumer(self): server_url = self._find_free_server() # setting the server status to be busy! This will be reset by # the send_batch function after the server responds - self._server_status[server_url] = False if server_url is None: # TODO - a timeout until we try looking for servers logger.error("No servers available") @@ -306,12 +309,16 @@ async def update_servers(servers: List[str], authenticated: bool = Depends(authe self.servers = servers self._iter = cycle(self.servers) updated_servers = set() + # do not try to loop over the dict keys as the dict might change from other places + existing_servers = list(self._server_status.keys()) for server in servers: updated_servers.add(server) - if server not in self._server_status: + if server not in existing_servers: self._server_status[server] = True - for existing in self._server_status: + logger.info(f"Registering server {server}", self._server_status) + for existing in existing_servers: if existing not in updated_servers: + logger.info(f"De-Registering server {existing}", self._server_status) del self._server_status[existing] @fastapi_app.post(self.endpoint, response_model=self._output_type) From 2309e1137ac155f4b803f763514aa1e1241122eb Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 19:11:47 +0000 Subject: [PATCH 11/11] changelog --- src/lightning_app/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lightning_app/CHANGELOG.md b/src/lightning_app/CHANGELOG.md index 65d62b29daca5..ffa1ece29ea4b 100644 --- a/src/lightning_app/CHANGELOG.md +++ b/src/lightning_app/CHANGELOG.md @@ -59,6 +59,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Fixed a bug where `AutoScaler` would fail with min_replica=0 ([#16092](https://github.com/Lightning-AI/lightning/pull/16092) +- Fixed auto-batching to enable batching for requests coming even after batch interval but is in the queue ([#16110](https://github.com/Lightning-AI/lightning/pull/16110)) + ## [1.8.4] - 2022-12-08