Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions langgraph/checkpoint/redis/ashallow.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,12 +646,11 @@ async def _aload_pending_sends(

# Extract type and blob pairs
# Handle both direct attribute access and JSON path access
# Filter out documents where blob is None (similar to AsyncRedisSaver in aio.py)
return [
(
getattr(doc, "type", ""),
getattr(doc, "$.blob", getattr(doc, "blob", b"")),
)
(getattr(doc, "type", ""), blob)
for doc in sorted_writes
if (blob := getattr(doc, "$.blob", getattr(doc, "blob", None))) is not None
]

async def _aload_pending_writes(
Expand Down
7 changes: 3 additions & 4 deletions langgraph/checkpoint/redis/shallow.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,12 +675,11 @@ def _load_pending_sends(

# Extract type and blob pairs
# Handle both direct attribute access and JSON path access
# Filter out documents where blob is None (similar to RedisSaver in __init__.py)
return [
(
getattr(doc, "type", ""),
getattr(doc, "$.blob", getattr(doc, "blob", b"")),
)
(getattr(doc, "type", ""), blob)
for doc in sorted_writes
if (blob := getattr(doc, "$.blob", getattr(doc, "blob", None))) is not None
]

def _make_shallow_redis_checkpoint_key_cached(
Expand Down
71 changes: 71 additions & 0 deletions tests/test_blob_encoding_error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,77 @@ def test_load_writes_empty_cases(redis_url: str) -> None:
assert len(result2.pending_writes) == 0


def test_empty_blob_in_pending_sends(redis_url: str) -> None:
"""Test handling of empty byte string (b"") in pending sends as per PR #82 review suggestion."""
with _saver(redis_url) as saver:
thread_id = str(uuid4())

# Create a checkpoint
config: RunnableConfig = {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": "",
"checkpoint_id": "test-checkpoint",
}
}

checkpoint = create_checkpoint(
checkpoint=empty_checkpoint(),
channels={},
step=1,
)

saved_config = saver.put(
config, checkpoint, {"source": "test", "step": 1, "writes": {}}, {}
)

# Test 1: Write with empty byte string blob
from langgraph.constants import TASKS

# This tests the edge case where blob is b""
empty_blob_data = b""
saver.put_writes(
saved_config,
[(TASKS, empty_blob_data)], # Empty blob
task_id="empty_blob_task",
)

# Test 2: Write with None-like values that should be handled gracefully
none_like_values = [
("channel1", ""), # Empty string
("channel2", b""), # Empty bytes
("channel3", {}), # Empty dict
("channel4", []), # Empty list
]

saver.put_writes(
saved_config,
none_like_values,
task_id="empty_values_task",
)

# Retrieve and verify all writes are handled correctly
result = saver.get_tuple(saved_config)
assert result is not None

# Check pending writes
pending_writes = result.pending_writes

# Find our writes
empty_blob_writes = [w for w in pending_writes if w[0] == "empty_blob_task"]
empty_values_writes = [w for w in pending_writes if w[0] == "empty_values_task"]

# Verify empty blob is handled correctly
assert len(empty_blob_writes) == 1
assert empty_blob_writes[0][1] == TASKS
assert empty_blob_writes[0][2] == b"" # Should preserve empty bytes

# Verify other empty values are handled
assert len(empty_values_writes) == 4
for write in empty_values_writes:
assert write[2] is not None # Should not be None even for empty values


def test_checkpoint_with_special_characters(redis_url: str) -> None:
"""Test handling of special characters and null bytes in data.

Expand Down
Loading