Skip to content

Commit 64df956

Browse files
committed
fix: resolve Command(resume) warning about invalid packet type dict (#83)
Issue #83: Command with resume argument failing in v0.1.0 Users reported warning 'Ignoring invalid packet type <class 'dict'> in pending sends' when using Command(resume={'interrupt_id': {'some': 'result'}}) after upgrading from 0.0.8 to 0.1.0. Root cause: Type annotation mismatch in _load_pending_sends methods. They were annotated as returning List[Tuple[str, bytes]] but Redis JSON actually returns strings for blob data, not bytes, causing List[Tuple[str, Union[str, bytes]]]. This type mismatch caused the warning when Command(resume) tried to process pending sends containing dict values through the TASKS channel. Changes: - Updated return type hints for _load_pending_sends methods in both sync and async - Updated _load_pending_sends_with_registry_check type hints - Updated _abatch_load_pending_sends and local variable annotations in async - Added test that simulates Command(resume) scenario and verifies no warning - Added test for type compatibility with Redis JSON string blobs The fix ensures Command(resume) works without warnings while maintaining backward compatibility with code that passes bytes.
1 parent 13ddc96 commit 64df956

File tree

3 files changed

+145
-6
lines changed

3 files changed

+145
-6
lines changed

langgraph/checkpoint/redis/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,7 +1155,7 @@ def _load_pending_sends(
11551155
thread_id: str,
11561156
checkpoint_ns: str,
11571157
parent_checkpoint_id: str,
1158-
) -> List[Tuple[str, bytes]]:
1158+
) -> List[Tuple[str, Union[str, bytes]]]:
11591159
"""Load pending sends for a parent checkpoint.
11601160
11611161
Args:
@@ -1437,7 +1437,7 @@ def _load_pending_sends_with_registry_check(
14371437
thread_id: str,
14381438
checkpoint_ns: str,
14391439
parent_checkpoint_id: str,
1440-
) -> List[Tuple[str, bytes]]:
1440+
) -> List[Tuple[str, Union[str, bytes]]]:
14411441
"""Load pending sends for a parent checkpoint with pre-computed registry check."""
14421442
if not parent_checkpoint_id:
14431443
return []

langgraph/checkpoint/redis/aio.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
541541
if doc_parent_checkpoint_id:
542542
results = await asyncio.gather(*tasks)
543543
channel_values: Dict[str, Any] = results[0]
544-
pending_sends: List[Tuple[str, bytes]] = results[1]
544+
pending_sends: List[Tuple[str, Union[str, bytes]]] = results[1]
545545
pending_writes: List[PendingWrite] = results[2]
546546
else:
547547
# Only channel_values and pending_writes tasks
@@ -772,7 +772,7 @@ async def alist(
772772
parent_checkpoint_id = doc_data["parent_checkpoint_id"]
773773

774774
# Get pending_sends from batch results
775-
pending_sends: List[Tuple[str, bytes]] = []
775+
pending_sends: List[Tuple[str, Union[str, bytes]]] = []
776776
if parent_checkpoint_id:
777777
batch_key = (thread_id, checkpoint_ns, parent_checkpoint_id)
778778
pending_sends = pending_sends_map.get(batch_key, [])
@@ -1443,7 +1443,7 @@ async def aget_channel_values(
14431443

14441444
async def _aload_pending_sends(
14451445
self, thread_id: str, checkpoint_ns: str = "", parent_checkpoint_id: str = ""
1446-
) -> List[Tuple[str, bytes]]:
1446+
) -> List[Tuple[str, Union[str, bytes]]]:
14471447
"""Load pending sends for a parent checkpoint.
14481448
14491449
Args:
@@ -1640,7 +1640,7 @@ async def _aload_pending_writes(
16401640

16411641
async def _abatch_load_pending_sends(
16421642
self, batch_keys: List[Tuple[str, str, str]]
1643-
) -> Dict[Tuple[str, str, str], List[Tuple[str, bytes]]]:
1643+
) -> Dict[Tuple[str, str, str], List[Tuple[str, Union[str, bytes]]]]:
16441644
"""Batch load pending sends for multiple parent checkpoints.
16451645
16461646
Args:

tests/test_checkpoint_serialization.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,145 @@ def _saver(redis_url: str):
3535
del saver
3636

3737

38+
def test_issue_83_command_resume_no_warning(redis_url: str) -> None:
39+
"""Test that Command(resume={...}) doesn't cause 'invalid packet type' warning (issue #83).
40+
41+
The user reported that Command(resume={'interrupt_id': {'some': 'result'}})
42+
caused warning: "Ignoring invalid packet type <class 'dict'> in pending sends"
43+
This test verifies our fix prevents that warning.
44+
"""
45+
import warnings
46+
from langgraph.constants import TASKS
47+
48+
with _saver(redis_url) as saver:
49+
# Create interrupted checkpoint
50+
interrupted_config = {
51+
"configurable": {
52+
"thread_id": "test-thread-83",
53+
"checkpoint_ns": "",
54+
"checkpoint_id": "interrupted-checkpoint",
55+
}
56+
}
57+
58+
interrupted_checkpoint = {
59+
"v": 1,
60+
"ts": "2024-01-01T00:00:00+00:00",
61+
"id": "interrupted-checkpoint",
62+
"channel_values": {"messages": ["before interrupt"]},
63+
"channel_versions": {},
64+
"versions_seen": {},
65+
"pending_sends": [],
66+
}
67+
68+
metadata = {"source": "loop", "step": 1, "writes": {}}
69+
70+
# Save the interrupted checkpoint
71+
saver.put(interrupted_config, interrupted_checkpoint, metadata, {})
72+
73+
# Simulate Command(resume={'interrupt_id': {'some': 'result'}})
74+
resume_data = {"interrupt_id": {"some": "result"}}
75+
saver.put_writes(
76+
interrupted_config,
77+
[(TASKS, resume_data)], # This puts a dict into TASKS
78+
task_id="resume_task",
79+
)
80+
81+
# Create resumed checkpoint with parent reference
82+
resumed_config = {
83+
"configurable": {
84+
"thread_id": "test-thread-83",
85+
"checkpoint_ns": "",
86+
"checkpoint_id": "resumed-checkpoint",
87+
}
88+
}
89+
90+
resumed_checkpoint = {
91+
"v": 1,
92+
"ts": "2024-01-01T00:01:00+00:00",
93+
"id": "resumed-checkpoint",
94+
"channel_values": {"messages": ["after resume"]},
95+
"channel_versions": {},
96+
"versions_seen": {},
97+
"pending_sends": [],
98+
}
99+
100+
resumed_metadata = {
101+
"source": "loop",
102+
"step": 2,
103+
"writes": {},
104+
"parent_config": interrupted_config
105+
}
106+
107+
saver.put(resumed_config, resumed_checkpoint, resumed_metadata, {})
108+
109+
# Load resumed checkpoint - check for warning
110+
with warnings.catch_warnings(record=True) as w:
111+
warnings.simplefilter("always")
112+
113+
result = saver.get_tuple(resumed_config)
114+
115+
# Check if we get the warning about invalid packet type
116+
dict_warnings = [
117+
warning for warning in w
118+
if "Ignoring invalid packet type" in str(warning.message)
119+
and "dict" in str(warning.message)
120+
]
121+
122+
# Our fix should prevent this warning
123+
assert len(dict_warnings) == 0, f"Got warning: {dict_warnings}"
124+
125+
assert result is not None
126+
assert result.checkpoint["id"] == "resumed-checkpoint"
127+
128+
129+
def test_issue_83_pending_sends_type_compatibility(redis_url: str) -> None:
130+
"""Test that pending_sends work with string blobs from Redis JSON (issue #83).
131+
132+
Issue #83 was caused by type mismatch where _load_pending_sends returned
133+
List[Tuple[str, Union[str, bytes]]] but was annotated as List[Tuple[str, bytes]].
134+
This test verifies the fix works correctly.
135+
"""
136+
with _saver(redis_url) as saver:
137+
checkpoint_dict = {
138+
"v": 1,
139+
"ts": "2024-01-01T00:00:00+00:00",
140+
"id": "test-checkpoint",
141+
"channel_versions": {},
142+
"versions_seen": {},
143+
}
144+
145+
channel_values = {}
146+
147+
# Test with string blobs (what Redis JSON returns)
148+
pending_sends_with_strings = [
149+
("json", '{"test": "value"}'), # String blob from Redis JSON
150+
]
151+
152+
# This should work without type errors
153+
result = saver._load_checkpoint(
154+
checkpoint_dict, channel_values, pending_sends_with_strings
155+
)
156+
157+
assert "pending_sends" in result
158+
assert len(result["pending_sends"]) == 1
159+
assert result["pending_sends"][0] == {"test": "value"}
160+
161+
# Test JsonPlusRedisSerializer compatibility
162+
test_data = {"some": "result", "user_input": "continue"}
163+
164+
# Serialize
165+
type_str, blob = saver.serde.dumps_typed(test_data)
166+
assert isinstance(type_str, str)
167+
assert isinstance(blob, str) # JsonPlusRedisSerializer returns strings
168+
169+
# Deserialize - should work with both string and bytes
170+
result1 = saver.serde.loads_typed((type_str, blob))
171+
result2 = saver.serde.loads_typed((type_str, blob.encode())) # bytes version
172+
173+
assert result1 == test_data
174+
assert result2 == test_data
175+
176+
38177
def test_load_blobs_method(redis_url: str) -> None:
39178
"""Test _load_blobs method with various scenarios.
40179

0 commit comments

Comments
 (0)