Skip to content

Commit 0250c19

Browse files
authored
[App] Resolve bi-directional queue bug (#15642)
1 parent 0dfb3d2 commit 0250c19

File tree

6 files changed

+58
-15
lines changed

6 files changed

+58
-15
lines changed

src/lightning_app/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
7171
- Fixed race condition to over-write the frontend with app infos ([#15398](https://github.com/Lightning-AI/lightning/pull/15398))
7272

7373

74+
- Fixed bi-directional queues sending delta with Drive Component name changes ([#15642](https://github.com/Lightning-AI/lightning/pull/15642))
75+
76+
7477

7578
## [1.8.0] - 2022-11-01
7679

src/lightning_app/cli/commands/logs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ def _show_logs(app_name: str, components: List[str], follow: bool) -> None:
7171
works = client.lightningwork_service_list_lightningwork(
7272
project_id=project.project_id, app_id=apps[app_name].id
7373
).lightningworks
74+
7475
app_component_names = ["flow"] + [f.name for f in apps[app_name].spec.flow_servers] + [w.name for w in works]
7576

7677
if not components:

src/lightning_app/components/serve/python_server.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import abc
22
import base64
3+
import os
34
from pathlib import Path
45
from typing import Any, Dict, Optional
56

@@ -14,12 +15,6 @@
1415
logger = Logger(__name__)
1516

1617

17-
def image_to_base64(image_path):
18-
with open(image_path, "rb") as image_file:
19-
encoded_string = base64.b64encode(image_file.read())
20-
return encoded_string.decode("UTF-8")
21-
22-
2318
class _DefaultInputData(BaseModel):
2419
payload: str
2520

@@ -33,7 +28,8 @@ class Image(BaseModel):
3328

3429
@staticmethod
3530
def _get_sample_data() -> Dict[Any, Any]:
36-
imagepath = Path(__file__).absolute().parent / "catimage.png"
31+
name = "lightning" + "_" + "app"
32+
imagepath = Path(__file__.replace(f"lightning{os.sep}app", name)).absolute().parent / "catimage.png"
3733
with open(imagepath, "rb") as image_file:
3834
encoded_string = base64.b64encode(image_file.read())
3935
return {"image": encoded_string.decode("UTF-8")}

src/lightning_app/core/app.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from lightning_app.core.queues import BaseQueue, SingleProcessQueue
2525
from lightning_app.core.work import LightningWork
2626
from lightning_app.frontend import Frontend
27-
from lightning_app.storage import Drive, Path
27+
from lightning_app.storage import Drive, Path, Payload
2828
from lightning_app.storage.path import _storage_root_dir
2929
from lightning_app.utilities import frontend
3030
from lightning_app.utilities.app_helpers import (
@@ -630,8 +630,16 @@ def _extract_vars_from_component_name(component_name: str, state):
630630
else:
631631
return None
632632

633-
# Note: Remove private keys
634-
return {k: v for k, v in child["vars"].items() if not k.startswith("_")}
633+
# Filter private keys and drives
634+
return {
635+
k: v
636+
for k, v in child["vars"].items()
637+
if (
638+
not k.startswith("_")
639+
and not (isinstance(v, dict) and v.get("type", None) == "__drive__")
640+
and not (isinstance(v, (Payload, Path)))
641+
)
642+
}
635643

636644
def _send_flow_to_work_deltas(self, state) -> None:
637645
if not self.flow_to_work_delta_queues:
@@ -652,10 +660,6 @@ def _send_flow_to_work_deltas(self, state) -> None:
652660
if state_work is None or last_state_work is None:
653661
continue
654662

655-
# Note: The flow shouldn't update path or drive manually.
656-
last_state_work = apply_to_collection(last_state_work, (Path, Drive), lambda x: None)
657-
state_work = apply_to_collection(state_work, (Path, Drive), lambda x: None)
658-
659663
deep_diff = DeepDiff(last_state_work, state_work, verbose_level=2).to_dict()
660664

661665
if "unprocessed" in deep_diff:

src/lightning_app/testing/testing.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,18 @@ def fetch_logs(component_names: Optional[List[str]] = None) -> Generator:
431431
project_id=project.project_id,
432432
app_id=app_id,
433433
).lightningworks
434+
434435
component_names = ["flow"] + [w.name for w in works]
436+
else:
437+
438+
def add_prefix(c: str) -> str:
439+
if c == "flow":
440+
return c
441+
if not c.startswith("root."):
442+
return "root." + c
443+
return c
444+
445+
component_names = [add_prefix(c) for c in component_names]
435446

436447
gen = _app_logs_reader(
437448
logs_api_client=logs_api_client,

tests/tests_app/utilities/test_proxies.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from lightning_app import LightningApp, LightningFlow, LightningWork
1616
from lightning_app.runners import MultiProcessRuntime
17-
from lightning_app.storage import Path
17+
from lightning_app.storage import Drive, Path
1818
from lightning_app.storage.path import _artifacts_path
1919
from lightning_app.storage.requests import _GetRequest
2020
from lightning_app.testing.helpers import _MockQueue, EmptyFlow
@@ -761,3 +761,31 @@ def test_bi_directional_proxy_forbidden(monkeypatch):
761761
MultiProcessRuntime(app, start_server=False).dispatch()
762762
assert app.stage == AppStage.FAILED
763763
assert "A forbidden operation to update the work" in str(app.exception)
764+
765+
766+
class WorkDrive(LightningFlow):
767+
def __init__(self, drive):
768+
super().__init__()
769+
self.drive = drive
770+
self.path = Path("data")
771+
772+
def run(self):
773+
pass
774+
775+
776+
class FlowDrive(LightningFlow):
777+
def __init__(self):
778+
super().__init__()
779+
self.data = Drive("lit://data")
780+
self.counter = 0
781+
782+
def run(self):
783+
if not hasattr(self, "w"):
784+
self.w = WorkDrive(self.data)
785+
self.counter += 1
786+
787+
788+
def test_bi_directional_proxy_filtering():
789+
app = LightningApp(FlowDrive())
790+
app.root.run()
791+
assert app._extract_vars_from_component_name(app.root.w.name, app.state) == {}

0 commit comments

Comments
 (0)